Merge pull request #20669 from hongchaodeng/sched

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-02-27 00:58:55 -08:00
commit 03f356edb9
12 changed files with 219 additions and 433 deletions

View File

@ -44,19 +44,12 @@ func calculateScore(requested int64, capacity int64, node string) int {
// Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
// 'pods' is a list of pods currently scheduled on the node.
func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := int64(0)
totalMemory := int64(0)
func calculateResourceOccupancy(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
totalMemory := nodeInfo.NonZeroRequest().Memory
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value()
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers {
@ -93,7 +86,7 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca
list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods()))
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name]))
}
return list, nil
}
@ -227,22 +220,15 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul
list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods()))
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name]))
}
return list, nil
}
func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := int64(0)
totalMemory := int64(0)
func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
totalMemory := nodeInfo.NonZeroRequest().Memory
score := int(0)
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers {

View File

@ -138,7 +138,6 @@ func TestZeroRequest(t *testing.T) {
list, err := scheduler.PrioritizeNodes(
test.pod,
nodeNameToInfo,
algorithm.FakePodLister(test.pods),
// This should match the configuration in defaultPriorities() in
// plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production.

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error)
@ -285,7 +286,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}
scheduler := NewGenericScheduler(test.predicates, test.prioritizers, extenders, algorithm.FakePodLister(test.pods), random)
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders, random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {
if err == nil {

View File

@ -42,6 +42,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
const (
@ -74,7 +75,7 @@ type ConfigFactory struct {
StopEverything chan struct{}
scheduledPodPopulator *framework.Controller
modeler scheduler.SystemModeler
schedulerCache schedulercache.Cache
// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's annotation key:
@ -84,6 +85,9 @@ type ConfigFactory struct {
// Initializes the factory.
func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
c := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
@ -95,12 +99,12 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}),
schedulerCache: schedulerCache,
StopEverything: stopEverything,
SchedulerName: schedulerName,
}
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister)
c.modeler = modeler
c.PodLister = modeler.PodLister()
c.PodLister = schedulerCache
// On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because
@ -112,21 +116,49 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
0,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
c.modeler.LockedAction(func() {
c.modeler.ForgetPod(pod)
})
pod, ok := obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.AddPod(pod); err != nil {
glog.Errorf("scheduler cache AddPod failed: %v", err)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
newPod, ok := newObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
},
DeleteFunc: func(obj interface{}) {
c.modeler.LockedAction(func() {
switch t := obj.(type) {
case *api.Pod:
c.modeler.ForgetPod(t)
case cache.DeletedFinalStateUnknown:
c.modeler.ForgetPodByKey(t.Key)
var pod *api.Pod
switch t := obj.(type) {
case *api.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
})
default:
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.RemovePod(pod); err != nil {
glog.Errorf("scheduler cache RemovePod failed: %v", err)
}
},
},
)
@ -241,7 +273,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r)
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
@ -252,7 +284,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
}
return &scheduler.Config{
Modeler: f.modeler,
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,

View File

@ -25,7 +25,6 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -55,10 +54,10 @@ func (f *FitError) Error() string {
}
type genericScheduler struct {
cache schedulercache.Cache
predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
pods algorithm.PodLister
random *rand.Rand
randomLock sync.Mutex
}
@ -75,13 +74,12 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
return "", ErrNoNodesAvailable
}
// TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute.
// But at least we're now only doing it in one place
pods, err := g.pods.List(labels.Everything())
// Used for all fit and priority funcs.
nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap()
if err != nil {
return "", err
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
if err != nil {
return "", err
@ -94,7 +92,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
}
}
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
if err != nil {
return "", err
}
@ -188,7 +186,7 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
// Each priority function can also have its own weight
// The node scores returned by the priority function are multiplied by the weights to get weighted scores
// All scores are finally combined (added) to get the total weighted scores of all nodes
func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, podLister algorithm.PodLister, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) {
func PrioritizeNodes(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInfo, priorityConfigs []algorithm.PriorityConfig, nodeLister algorithm.NodeLister, extenders []algorithm.SchedulerExtender) (schedulerapi.HostPriorityList, error) {
result := schedulerapi.HostPriorityList{}
// If no priority configs are provided, then the EqualPriority function is applied
@ -288,12 +286,12 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf
return result, nil
}
func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm {
func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, random *rand.Rand) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
predicates: predicates,
prioritizers: prioritizers,
extenders: extenders,
pods: pods,
random: random,
}
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
@ -256,7 +257,7 @@ func TestGenericScheduler(t *testing.T) {
for _, test := range tests {
random := rand.New(rand.NewSource(0))
scheduler := NewGenericScheduler(test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, algorithm.FakePodLister(test.pods), random)
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {
if err == nil {

View File

@ -1,197 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"strings"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"github.com/golang/glog"
)
var (
_ = SystemModeler(&FakeModeler{})
_ = SystemModeler(&SimpleModeler{})
)
// ExtendedPodLister: SimpleModeler needs to be able to check for a pod's
// existence in addition to listing the pods.
type ExtendedPodLister interface {
algorithm.PodLister
Exists(pod *api.Pod) (bool, error)
}
// actionLocker implements lockedAction (so the fake and SimpleModeler can both
// use it)
type actionLocker struct {
sync.Mutex
}
// LockedAction serializes calls of whatever is passed as 'do'.
func (a *actionLocker) LockedAction(do func()) {
a.Lock()
defer a.Unlock()
do()
}
// FakeModeler implements the SystemModeler interface.
type FakeModeler struct {
AssumePodFunc func(pod *api.Pod)
ForgetPodFunc func(pod *api.Pod)
ForgetPodByKeyFunc func(key string)
actionLocker
}
// AssumePod calls the function variable if it is not nil.
func (f *FakeModeler) AssumePod(pod *api.Pod) {
if f.AssumePodFunc != nil {
f.AssumePodFunc(pod)
}
}
// ForgetPod calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPod(pod *api.Pod) {
if f.ForgetPodFunc != nil {
f.ForgetPodFunc(pod)
}
}
// ForgetPodByKey calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPodByKey(key string) {
if f.ForgetPodFunc != nil {
f.ForgetPodByKeyFunc(key)
}
}
// SimpleModeler implements the SystemModeler interface with a timed pod cache.
type SimpleModeler struct {
queuedPods ExtendedPodLister
scheduledPods ExtendedPodLister
// assumedPods holds the pods that we think we've scheduled, but that
// haven't yet shown up in the scheduledPods variable.
// TODO: periodically clear this.
assumedPods *cache.StoreToPodLister
actionLocker
}
// NewSimpleModeler returns a new SimpleModeler.
// queuedPods: a PodLister that will return pods that have not been scheduled yet.
// scheduledPods: a PodLister that will return pods that we know for sure have been scheduled.
func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModeler {
return &SimpleModeler{
queuedPods: queuedPods,
scheduledPods: scheduledPods,
assumedPods: &cache.StoreToPodLister{
Store: cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second),
},
}
}
func (s *SimpleModeler) AssumePod(pod *api.Pod) {
s.assumedPods.Add(pod)
}
func (s *SimpleModeler) ForgetPod(pod *api.Pod) {
s.assumedPods.Delete(pod)
}
func (s *SimpleModeler) ForgetPodByKey(key string) {
s.assumedPods.Delete(cache.ExplicitKey(key))
}
// Extract names for readable logging.
func podNames(pods []*api.Pod) []string {
out := make([]string, len(pods))
for i := range pods {
out[i] = fmt.Sprintf("'%v/%v (%v)'", pods[i].Namespace, pods[i].Name, pods[i].UID)
}
return out
}
func (s *SimpleModeler) listPods(selector labels.Selector) (pods []*api.Pod, err error) {
assumed, err := s.assumedPods.List(selector)
if err != nil {
return nil, err
}
// Since the assumed list will be short, just check every one.
// Goal here is to stop making assumptions about a pod once it shows
// up in one of these other lists.
for _, pod := range assumed {
qExist, err := s.queuedPods.Exists(pod)
if err != nil {
return nil, err
}
if qExist {
s.assumedPods.Store.Delete(pod)
continue
}
sExist, err := s.scheduledPods.Exists(pod)
if err != nil {
return nil, err
}
if sExist {
s.assumedPods.Store.Delete(pod)
continue
}
}
scheduled, err := s.scheduledPods.List(selector)
if err != nil {
return nil, err
}
// Listing purges the ttl cache and re-gets, in case we deleted any entries.
assumed, err = s.assumedPods.List(selector)
if err != nil {
return nil, err
}
if len(assumed) == 0 {
return scheduled, nil
}
glog.V(2).Infof(
"listing pods: [%v] assumed to exist in addition to %v known pods.",
strings.Join(podNames(assumed), ","),
len(scheduled),
)
return append(scheduled, assumed...), nil
}
// PodLister returns a PodLister that will list pods that we think we have scheduled in
// addition to pods that we know have been scheduled.
func (s *SimpleModeler) PodLister() algorithm.PodLister {
return simpleModelerPods{s}
}
// simpleModelerPods is an adaptor so that SimpleModeler can be a PodLister.
type simpleModelerPods struct {
simpleModeler *SimpleModeler
}
// List returns pods known and assumed to exist.
func (s simpleModelerPods) List(selector labels.Selector) (pods []*api.Pod, err error) {
s.simpleModeler.LockedAction(
func() { pods, err = s.simpleModeler.listPods(selector) })
return
}

View File

@ -1,111 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
)
type nn struct {
namespace, name string
}
type names []nn
func (ids names) list() []*api.Pod {
out := make([]*api.Pod, 0, len(ids))
for _, id := range ids {
out = append(out, &api.Pod{
ObjectMeta: api.ObjectMeta{
Namespace: id.namespace,
Name: id.name,
},
})
}
return out
}
func (ids names) has(pod *api.Pod) bool {
for _, id := range ids {
if pod.Namespace == id.namespace && pod.Name == id.name {
return true
}
}
return false
}
func TestModeler(t *testing.T) {
table := []struct {
queuedPods []*api.Pod
scheduledPods []*api.Pod
assumedPods []*api.Pod
expectPods names
}{
{
queuedPods: names{}.list(),
scheduledPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
assumedPods: names{{"default", "foo"}}.list(),
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
}, {
queuedPods: names{}.list(),
scheduledPods: names{{"default", "foo"}}.list(),
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
}, {
queuedPods: names{{"custom", "foo"}}.list(),
scheduledPods: names{{"default", "foo"}}.list(),
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
expectPods: names{{"default", "foo"}},
},
}
for _, item := range table {
q := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
for _, pod := range item.queuedPods {
q.Store.Add(pod)
}
s := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
for _, pod := range item.scheduledPods {
s.Store.Add(pod)
}
m := NewSimpleModeler(q, s)
for _, pod := range item.assumedPods {
m.AssumePod(pod)
}
list, err := m.PodLister().List(labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
found := 0
for _, pod := range list {
if item.expectPods.has(pod) {
found++
} else {
t.Errorf("found unexpected pod %#v", pod)
}
}
if e, a := item.expectPods, found; len(e) != a {
t.Errorf("Expected pods:\n%+v\nFound pods:\n%s\n", podNames(e.list()), podNames(list))
}
}
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
@ -36,32 +37,6 @@ type Binder interface {
Bind(binding *api.Binding) error
}
// SystemModeler can help scheduler produce a model of the system that
// anticipates reality. For example, if scheduler has pods A and B both
// using hostPort 80, when it binds A to machine M it should not bind B
// to machine M in the time when it hasn't observed the binding of A
// take effect yet.
//
// Since the model is only an optimization, it's expected to handle
// any errors itself without sending them back to the scheduler.
type SystemModeler interface {
// AssumePod assumes that the given pod exists in the system.
// The assumtion should last until the system confirms the
// assumtion or disconfirms it.
AssumePod(pod *api.Pod)
// ForgetPod removes a pod assumtion. (It won't make the model
// show the absence of the given pod if the pod is in the scheduled
// pods list!)
ForgetPod(pod *api.Pod)
ForgetPodByKey(key string)
// For serializing calls to Assume/ForgetPod: imagine you want to add
// a pod if and only if a bind succeeds, but also remove a pod if it is deleted.
// TODO: if SystemModeler begins modeling things other than pods, this
// should probably be parameterized or specialized for pods.
LockedAction(f func())
}
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
@ -69,12 +44,12 @@ type Scheduler struct {
}
type Config struct {
// It is expected that changes made via modeler will be observed
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
Modeler SystemModeler
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
SchedulerCache schedulercache.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -129,24 +104,25 @@ func (s *Scheduler) scheduleOne() {
},
}
// We want to add the pod to the model if and only if the bind succeeds,
// but we don't want to race with any deletions, which happen asynchronously.
s.config.Modeler.LockedAction(func() {
bindAction := func() bool {
bindingStart := time.Now()
err := s.config.Binder.Bind(b)
if err != nil {
glog.V(1).Infof("Failed to bind pod: %+v", err)
s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err)
return
return false
}
metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest)
// tell the model to assume that this binding took effect.
assumed := *pod
assumed.Spec.NodeName = dest
s.config.Modeler.AssumePod(&assumed)
})
return true
}
assumed := *pod
assumed.Spec.NodeName = dest
// We want to assume the pod if and only if the bind succeeds,
// but we don't want to race with any deletions, which happen asynchronously.
s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, bindAction)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
}

View File

@ -23,13 +23,18 @@ import (
"testing"
"time"
"sync"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
type fakeBinder struct {
@ -109,8 +114,8 @@ func TestScheduler(t *testing.T) {
var gotAssumedPod *api.Pod
var gotBinding *api.Binding
c := &Config{
Modeler: &FakeModeler{
AssumePodFunc: func(pod *api.Pod) {
SchedulerCache: &schedulertesting.FakeCache{
AssumeFunc: func(pod *api.Pod) {
gotAssumedPod = pod
},
},
@ -161,42 +166,30 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
eventBroadcaster := record.NewBroadcaster()
defer eventBroadcaster.StartLogging(t.Logf).Stop()
// Setup modeler so we control the contents of all 3 stores: assumed,
// scheduled and queued
// Setup stores to test pod's workflow:
// - queuedPodStore: pods queued before processing
// - scheduledPodStore: pods that has a scheduling decision
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore}
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore}
modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister)
// Create a fake clock used to timestamp entries and calculate ttl. Nothing
// will expire till we flip to something older than the ttl, at which point
// all entries inserted with fakeTime will expire.
ttl := 30 * time.Second
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
fakeClock := util.NewFakeClock(fakeTime)
ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
assumedPodsStore := cache.NewFakeExpirationStore(
cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock)
modeler.assumedPods = &cache.StoreToPodLister{Store: assumedPodsStore}
// Port is the easiest way to cause a fit predicate failure
podPort := 8080
firstPod := podWithPort("foo", "", podPort)
stop := make(chan struct{})
defer close(stop)
cache := schedulercache.New(1*time.Second, stop)
// Create the scheduler config
algo := NewGenericScheduler(
cache,
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
[]algorithm.PriorityConfig{},
[]algorithm.SchedulerExtender{},
modeler.PodLister(),
rand.New(rand.NewSource(time.Now().UnixNano())))
var gotBinding *api.Binding
c := &Config{
Modeler: modeler,
SchedulerCache: cache,
NodeLister: algorithm.FakeNodeLister(
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
),
@ -243,10 +236,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
if exists {
t.Errorf("Did not expect a queued pod, found %+v", pod)
}
pod, exists, _ = assumedPodsStore.GetByKey("foo")
if !exists {
t.Errorf("Assumed pod store should contain stale pod")
}
expectBind := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "foo"},
@ -260,10 +249,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
events.Stop()
scheduledPodStore.Delete(pod)
_, exists, _ = assumedPodsStore.Get(pod)
if !exists {
t.Errorf("Expected pod %#v in assumed pod store", pod)
}
secondPod := podWithPort("bar", "", podPort)
queuedPodStore.Add(secondPod)
@ -271,10 +256,26 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
// scheduledPodStore: []
// assumedPods: [foo:8080]
var wg sync.WaitGroup
wg.Add(1)
// waiting for the assumed pod to expire
go func() {
for {
pods, err := cache.List(labels.Everything())
if err != nil {
t.Fatalf("cache.List failed: %v", err)
}
if len(pods) == 0 {
wg.Done()
return
}
time.Sleep(1 * time.Second)
}
}()
wg.Wait()
// Second scheduling pass will fail to schedule if the store hasn't expired
// the deleted pod. This would normally happen with a timeout.
//expirationPolicy.NeverExpire = util.NewStringSet()
fakeClock.Step(ttl + 1)
called = make(chan struct{})
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {

View File

@ -0,0 +1,48 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// FakeCache is used for testing
type FakeCache struct {
AssumeFunc func(*api.Pod)
}
func (f *FakeCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
f.AssumeFunc(pod)
return nil
}
func (f *FakeCache) AddPod(pod *api.Pod) error { return nil }
func (f *FakeCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil }
func (f *FakeCache) RemovePod(pod *api.Pod) error { return nil }
func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return nil, nil
}
func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil }

View File

@ -0,0 +1,52 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// PodsToCache is used for testing
type PodsToCache []*api.Pod
func (p PodsToCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
return nil
}
func (p PodsToCache) AddPod(pod *api.Pod) error { return nil }
func (p PodsToCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil }
func (p PodsToCache) RemovePod(pod *api.Pod) error { return nil }
func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return schedulercache.CreateNodeNameToInfoMap(p), nil
}
func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) {
for _, pod := range p {
if s.Matches(labels.Set(pod.Labels)) {
selected = append(selected, pod)
}
}
return selected, nil
}