Scheduler: replace system modeler with scheduler cache

This commit is contained in:
Hongchao Deng 2016-03-13 21:58:28 -07:00
parent b26d6cb706
commit ae88f08af0
15 changed files with 308 additions and 457 deletions

View File

@ -44,19 +44,12 @@ func calculateScore(requested int64, capacity int64, node string) int {
// Calculate the resource occupancy on a node. 'node' has information about the resources on the node.
// 'pods' is a list of pods currently scheduled on the node.
func calculateResourceOccupancy(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := int64(0)
totalMemory := int64(0)
func calculateResourceOccupancy(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
totalMemory := nodeInfo.NonZeroRequest().Memory
capacityMilliCPU := node.Status.Allocatable.Cpu().MilliValue()
capacityMemory := node.Status.Allocatable.Memory().Value()
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers {
@ -93,7 +86,7 @@ func LeastRequestedPriority(pod *api.Pod, nodeNameToInfo map[string]*schedulerca
list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name].Pods()))
list = append(list, calculateResourceOccupancy(pod, node, nodeNameToInfo[node.Name]))
}
return list, nil
}
@ -227,22 +220,15 @@ func BalancedResourceAllocation(pod *api.Pod, nodeNameToInfo map[string]*schedul
list := schedulerapi.HostPriorityList{}
for _, node := range nodes.Items {
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name].Pods()))
list = append(list, calculateBalancedResourceAllocation(pod, node, nodeNameToInfo[node.Name]))
}
return list, nil
}
func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, pods []*api.Pod) schedulerapi.HostPriority {
totalMilliCPU := int64(0)
totalMemory := int64(0)
func calculateBalancedResourceAllocation(pod *api.Pod, node api.Node, nodeInfo *schedulercache.NodeInfo) schedulerapi.HostPriority {
totalMilliCPU := nodeInfo.NonZeroRequest().MilliCPU
totalMemory := nodeInfo.NonZeroRequest().Memory
score := int(0)
for _, existingPod := range pods {
for _, container := range existingPod.Spec.Containers {
cpu, memory := priorityutil.GetNonzeroRequests(&container.Resources.Requests)
totalMilliCPU += cpu
totalMemory += memory
}
}
// Add the resources requested by the current pod being scheduled.
// This also helps differentiate between differently sized, but empty, nodes.
for _, container := range pod.Spec.Containers {

View File

@ -140,7 +140,6 @@ func TestZeroRequest(t *testing.T) {
list, err := scheduler.PrioritizeNodes(
test.pod,
nodeNameToInfo,
algorithm.FakePodLister(test.pods),
// This should match the configuration in defaultPriorities() in
// plugin/pkg/scheduler/algorithmprovider/defaults/defaults.go if you want
// to test what's actually in production.

View File

@ -25,6 +25,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
type fitPredicate func(pod *api.Pod, node *api.Node) (bool, error)
@ -285,7 +286,7 @@ func TestGenericSchedulerWithExtenders(t *testing.T) {
for ii := range test.extenders {
extenders = append(extenders, &test.extenders[ii])
}
scheduler := NewGenericScheduler(test.predicates, test.prioritizers, extenders, algorithm.FakePodLister(test.pods), random)
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, extenders, random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {
if err == nil {

View File

@ -42,6 +42,7 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/apis/extensions"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
const (
@ -74,7 +75,7 @@ type ConfigFactory struct {
StopEverything chan struct{}
scheduledPodPopulator *framework.Controller
modeler scheduler.SystemModeler
schedulerCache schedulercache.Cache
// SchedulerName of a scheduler is used to select which pods will be
// processed by this scheduler, based on pods's annotation key:
@ -84,6 +85,9 @@ type ConfigFactory struct {
// Initializes the factory.
func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactory {
stopEverything := make(chan struct{})
schedulerCache := schedulercache.New(30*time.Second, stopEverything)
c := &ConfigFactory{
Client: client,
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
@ -95,12 +99,12 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
ServiceLister: &cache.StoreToServiceLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ControllerLister: &cache.StoreToReplicationControllerLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
ReplicaSetLister: &cache.StoreToReplicaSetLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)},
StopEverything: make(chan struct{}),
schedulerCache: schedulerCache,
StopEverything: stopEverything,
SchedulerName: schedulerName,
}
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{Store: c.PodQueue}, c.ScheduledPodLister)
c.modeler = modeler
c.PodLister = modeler.PodLister()
c.PodLister = schedulerCache
// On add/delete to the scheduled pods, remove from the assumed pods.
// We construct this here instead of in CreateFromKeys because
@ -112,21 +116,49 @@ func NewConfigFactory(client *client.Client, schedulerName string) *ConfigFactor
0,
framework.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
if pod, ok := obj.(*api.Pod); ok {
c.modeler.LockedAction(func() {
c.modeler.ForgetPod(pod)
})
pod, ok := obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.AddPod(pod); err != nil {
glog.Errorf("scheduler cache AddPod failed: %v", err)
}
},
UpdateFunc: func(oldObj, newObj interface{}) {
oldPod, ok := oldObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
newPod, ok := newObj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.UpdatePod(oldPod, newPod); err != nil {
glog.Errorf("scheduler cache UpdatePod failed: %v", err)
}
},
DeleteFunc: func(obj interface{}) {
c.modeler.LockedAction(func() {
switch t := obj.(type) {
case *api.Pod:
c.modeler.ForgetPod(t)
case cache.DeletedFinalStateUnknown:
c.modeler.ForgetPodByKey(t.Key)
var pod *api.Pod
switch t := obj.(type) {
case *api.Pod:
pod = t
case cache.DeletedFinalStateUnknown:
var ok bool
pod, ok = t.Obj.(*api.Pod)
if !ok {
glog.Errorf("cannot convert to *api.Pod")
return
}
})
default:
glog.Errorf("cannot convert to *api.Pod")
return
}
if err := schedulerCache.RemovePod(pod); err != nil {
glog.Errorf("scheduler cache RemovePod failed: %v", err)
}
},
},
)
@ -241,7 +273,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
r := rand.New(rand.NewSource(time.Now().UnixNano()))
algo := scheduler.NewGenericScheduler(predicateFuncs, priorityConfigs, extenders, f.PodLister, r)
algo := scheduler.NewGenericScheduler(f.schedulerCache, predicateFuncs, priorityConfigs, extenders, r)
podBackoff := podBackoff{
perPodBackoff: map[types.NamespacedName]*backoffEntry{},
@ -252,7 +284,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys sets.String,
}
return &scheduler.Config{
Modeler: f.modeler,
SchedulerCache: f.schedulerCache,
// The scheduler only needs to consider schedulable nodes.
NodeLister: f.NodeLister.NodeCondition(getNodeConditionPredicate()),
Algorithm: algo,

View File

@ -26,7 +26,6 @@ import (
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util/errors"
"k8s.io/kubernetes/pkg/util/sets"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
@ -56,6 +55,7 @@ func (f *FitError) Error() string {
}
type genericScheduler struct {
cache schedulercache.Cache
predicates map[string]algorithm.FitPredicate
prioritizers []algorithm.PriorityConfig
extenders []algorithm.SchedulerExtender
@ -77,13 +77,12 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
return "", ErrNoNodesAvailable
}
// TODO: we should compute this once and dynamically update it using Watch, not constantly re-compute.
// But at least we're now only doing it in one place
pods, err := g.pods.List(labels.Everything())
// Used for all fit and priority funcs.
nodeNameToInfo, err := g.cache.GetNodeNameToInfoMap()
if err != nil {
return "", err
}
nodeNameToInfo := schedulercache.CreateNodeNameToInfoMap(pods)
filteredNodes, failedPredicateMap, err := findNodesThatFit(pod, nodeNameToInfo, g.predicates, nodes, g.extenders)
if err != nil {
return "", err
@ -96,7 +95,7 @@ func (g *genericScheduler) Schedule(pod *api.Pod, nodeLister algorithm.NodeListe
}
}
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.pods, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
priorityList, err := PrioritizeNodes(pod, nodeNameToInfo, g.prioritizers, algorithm.FakeNodeLister(filteredNodes), g.extenders)
if err != nil {
return "", err
}
@ -185,7 +184,6 @@ func findNodesThatFit(pod *api.Pod, nodeNameToInfo map[string]*schedulercache.No
func PrioritizeNodes(
pod *api.Pod,
nodeNameToInfo map[string]*schedulercache.NodeInfo,
podLister algorithm.PodLister,
priorityConfigs []algorithm.PriorityConfig,
nodeLister algorithm.NodeLister,
extenders []algorithm.SchedulerExtender,
@ -289,12 +287,12 @@ func EqualPriority(_ *api.Pod, nodeNameToInfo map[string]*schedulercache.NodeInf
return result, nil
}
func NewGenericScheduler(predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, pods algorithm.PodLister, random *rand.Rand) algorithm.ScheduleAlgorithm {
func NewGenericScheduler(cache schedulercache.Cache, predicates map[string]algorithm.FitPredicate, prioritizers []algorithm.PriorityConfig, extenders []algorithm.SchedulerExtender, random *rand.Rand) algorithm.ScheduleAlgorithm {
return &genericScheduler{
cache: cache,
predicates: predicates,
prioritizers: prioritizers,
extenders: extenders,
pods: pods,
random: random,
}
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
schedulerapi "k8s.io/kubernetes/plugin/pkg/scheduler/api"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
func falsePredicate(pod *api.Pod, nodeName string, nodeInfo *schedulercache.NodeInfo) (bool, error) {
@ -256,7 +257,7 @@ func TestGenericScheduler(t *testing.T) {
for _, test := range tests {
random := rand.New(rand.NewSource(0))
scheduler := NewGenericScheduler(test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, algorithm.FakePodLister(test.pods), random)
scheduler := NewGenericScheduler(schedulertesting.PodsToCache(test.pods), test.predicates, test.prioritizers, []algorithm.SchedulerExtender{}, random)
machine, err := scheduler.Schedule(test.pod, algorithm.FakeNodeLister(makeNodeList(test.nodes)))
if test.expectsErr {
if err == nil {

View File

@ -1,197 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"fmt"
"strings"
"sync"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"github.com/golang/glog"
)
var (
_ = SystemModeler(&FakeModeler{})
_ = SystemModeler(&SimpleModeler{})
)
// ExtendedPodLister: SimpleModeler needs to be able to check for a pod's
// existence in addition to listing the pods.
type ExtendedPodLister interface {
algorithm.PodLister
Exists(pod *api.Pod) (bool, error)
}
// actionLocker implements lockedAction (so the fake and SimpleModeler can both
// use it)
type actionLocker struct {
sync.Mutex
}
// LockedAction serializes calls of whatever is passed as 'do'.
func (a *actionLocker) LockedAction(do func()) {
a.Lock()
defer a.Unlock()
do()
}
// FakeModeler implements the SystemModeler interface.
type FakeModeler struct {
AssumePodFunc func(pod *api.Pod)
ForgetPodFunc func(pod *api.Pod)
ForgetPodByKeyFunc func(key string)
actionLocker
}
// AssumePod calls the function variable if it is not nil.
func (f *FakeModeler) AssumePod(pod *api.Pod) {
if f.AssumePodFunc != nil {
f.AssumePodFunc(pod)
}
}
// ForgetPod calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPod(pod *api.Pod) {
if f.ForgetPodFunc != nil {
f.ForgetPodFunc(pod)
}
}
// ForgetPodByKey calls the function variable if it is not nil.
func (f *FakeModeler) ForgetPodByKey(key string) {
if f.ForgetPodFunc != nil {
f.ForgetPodByKeyFunc(key)
}
}
// SimpleModeler implements the SystemModeler interface with a timed pod cache.
type SimpleModeler struct {
queuedPods ExtendedPodLister
scheduledPods ExtendedPodLister
// assumedPods holds the pods that we think we've scheduled, but that
// haven't yet shown up in the scheduledPods variable.
// TODO: periodically clear this.
assumedPods *cache.StoreToPodLister
actionLocker
}
// NewSimpleModeler returns a new SimpleModeler.
// queuedPods: a PodLister that will return pods that have not been scheduled yet.
// scheduledPods: a PodLister that will return pods that we know for sure have been scheduled.
func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModeler {
return &SimpleModeler{
queuedPods: queuedPods,
scheduledPods: scheduledPods,
assumedPods: &cache.StoreToPodLister{
Store: cache.NewTTLStore(cache.MetaNamespaceKeyFunc, 30*time.Second),
},
}
}
func (s *SimpleModeler) AssumePod(pod *api.Pod) {
s.assumedPods.Add(pod)
}
func (s *SimpleModeler) ForgetPod(pod *api.Pod) {
s.assumedPods.Delete(pod)
}
func (s *SimpleModeler) ForgetPodByKey(key string) {
s.assumedPods.Delete(cache.ExplicitKey(key))
}
// Extract names for readable logging.
func podNames(pods []*api.Pod) []string {
out := make([]string, len(pods))
for i := range pods {
out[i] = fmt.Sprintf("'%v/%v (%v)'", pods[i].Namespace, pods[i].Name, pods[i].UID)
}
return out
}
func (s *SimpleModeler) listPods(selector labels.Selector) (pods []*api.Pod, err error) {
assumed, err := s.assumedPods.List(selector)
if err != nil {
return nil, err
}
// Since the assumed list will be short, just check every one.
// Goal here is to stop making assumptions about a pod once it shows
// up in one of these other lists.
for _, pod := range assumed {
qExist, err := s.queuedPods.Exists(pod)
if err != nil {
return nil, err
}
if qExist {
s.assumedPods.Store.Delete(pod)
continue
}
sExist, err := s.scheduledPods.Exists(pod)
if err != nil {
return nil, err
}
if sExist {
s.assumedPods.Store.Delete(pod)
continue
}
}
scheduled, err := s.scheduledPods.List(selector)
if err != nil {
return nil, err
}
// Listing purges the ttl cache and re-gets, in case we deleted any entries.
assumed, err = s.assumedPods.List(selector)
if err != nil {
return nil, err
}
if len(assumed) == 0 {
return scheduled, nil
}
glog.V(2).Infof(
"listing pods: [%v] assumed to exist in addition to %v known pods.",
strings.Join(podNames(assumed), ","),
len(scheduled),
)
return append(scheduled, assumed...), nil
}
// PodLister returns a PodLister that will list pods that we think we have scheduled in
// addition to pods that we know have been scheduled.
func (s *SimpleModeler) PodLister() algorithm.PodLister {
return simpleModelerPods{s}
}
// simpleModelerPods is an adaptor so that SimpleModeler can be a PodLister.
type simpleModelerPods struct {
simpleModeler *SimpleModeler
}
// List returns pods known and assumed to exist.
func (s simpleModelerPods) List(selector labels.Selector) (pods []*api.Pod, err error) {
s.simpleModeler.LockedAction(
func() { pods, err = s.simpleModeler.listPods(selector) })
return
}

View File

@ -1,111 +0,0 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package scheduler
import (
"testing"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/labels"
)
type nn struct {
namespace, name string
}
type names []nn
func (ids names) list() []*api.Pod {
out := make([]*api.Pod, 0, len(ids))
for _, id := range ids {
out = append(out, &api.Pod{
ObjectMeta: api.ObjectMeta{
Namespace: id.namespace,
Name: id.name,
},
})
}
return out
}
func (ids names) has(pod *api.Pod) bool {
for _, id := range ids {
if pod.Namespace == id.namespace && pod.Name == id.name {
return true
}
}
return false
}
func TestModeler(t *testing.T) {
table := []struct {
queuedPods []*api.Pod
scheduledPods []*api.Pod
assumedPods []*api.Pod
expectPods names
}{
{
queuedPods: names{}.list(),
scheduledPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
assumedPods: names{{"default", "foo"}}.list(),
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
}, {
queuedPods: names{}.list(),
scheduledPods: names{{"default", "foo"}}.list(),
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
}, {
queuedPods: names{{"custom", "foo"}}.list(),
scheduledPods: names{{"default", "foo"}}.list(),
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
expectPods: names{{"default", "foo"}},
},
}
for _, item := range table {
q := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
for _, pod := range item.queuedPods {
q.Store.Add(pod)
}
s := &cache.StoreToPodLister{Store: cache.NewStore(cache.MetaNamespaceKeyFunc)}
for _, pod := range item.scheduledPods {
s.Store.Add(pod)
}
m := NewSimpleModeler(q, s)
for _, pod := range item.assumedPods {
m.AssumePod(pod)
}
list, err := m.PodLister().List(labels.Everything())
if err != nil {
t.Errorf("unexpected error: %v", err)
}
found := 0
for _, pod := range list {
if item.expectPods.has(pod) {
found++
} else {
t.Errorf("found unexpected pod %#v", pod)
}
}
if e, a := item.expectPods, found; len(e) != a {
t.Errorf("Expected pods:\n%+v\nFound pods:\n%s\n", podNames(e.list()), podNames(list))
}
}
}

View File

@ -27,6 +27,7 @@ import (
"k8s.io/kubernetes/pkg/util/wait"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/metrics"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
"github.com/golang/glog"
)
@ -36,32 +37,6 @@ type Binder interface {
Bind(binding *api.Binding) error
}
// SystemModeler can help scheduler produce a model of the system that
// anticipates reality. For example, if scheduler has pods A and B both
// using hostPort 80, when it binds A to machine M it should not bind B
// to machine M in the time when it hasn't observed the binding of A
// take effect yet.
//
// Since the model is only an optimization, it's expected to handle
// any errors itself without sending them back to the scheduler.
type SystemModeler interface {
// AssumePod assumes that the given pod exists in the system.
// The assumtion should last until the system confirms the
// assumtion or disconfirms it.
AssumePod(pod *api.Pod)
// ForgetPod removes a pod assumtion. (It won't make the model
// show the absence of the given pod if the pod is in the scheduled
// pods list!)
ForgetPod(pod *api.Pod)
ForgetPodByKey(key string)
// For serializing calls to Assume/ForgetPod: imagine you want to add
// a pod if and only if a bind succeeds, but also remove a pod if it is deleted.
// TODO: if SystemModeler begins modeling things other than pods, this
// should probably be parameterized or specialized for pods.
LockedAction(f func())
}
// Scheduler watches for new unscheduled pods. It attempts to find
// nodes that they fit on and writes bindings back to the api server.
type Scheduler struct {
@ -69,12 +44,12 @@ type Scheduler struct {
}
type Config struct {
// It is expected that changes made via modeler will be observed
// It is expected that changes made via SchedulerCache will be observed
// by NodeLister and Algorithm.
Modeler SystemModeler
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
SchedulerCache schedulercache.Cache
NodeLister algorithm.NodeLister
Algorithm algorithm.ScheduleAlgorithm
Binder Binder
// NextPod should be a function that blocks until the next pod
// is available. We don't use a channel for this, because scheduling
@ -129,24 +104,25 @@ func (s *Scheduler) scheduleOne() {
},
}
// We want to add the pod to the model if and only if the bind succeeds,
// but we don't want to race with any deletions, which happen asynchronously.
s.config.Modeler.LockedAction(func() {
bindAction := func() bool {
bindingStart := time.Now()
err := s.config.Binder.Bind(b)
if err != nil {
glog.V(1).Infof("Failed to bind pod: %+v", err)
s.config.Recorder.Eventf(pod, api.EventTypeNormal, "FailedScheduling", "Binding rejected: %v", err)
s.config.Error(pod, err)
return
return false
}
metrics.BindingLatency.Observe(metrics.SinceInMicroseconds(bindingStart))
s.config.Recorder.Eventf(pod, api.EventTypeNormal, "Scheduled", "Successfully assigned %v to %v", pod.Name, dest)
// tell the model to assume that this binding took effect.
assumed := *pod
assumed.Spec.NodeName = dest
s.config.Modeler.AssumePod(&assumed)
})
return true
}
assumed := *pod
assumed.Spec.NodeName = dest
// We want to assume the pod if and only if the bind succeeds,
// but we don't want to race with any deletions, which happen asynchronously.
s.config.SchedulerCache.AssumePodIfBindSucceed(&assumed, bindAction)
metrics.E2eSchedulingLatency.Observe(metrics.SinceInMicroseconds(start))
}

View File

@ -29,9 +29,12 @@ import (
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/client/cache"
"k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/pkg/util"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm"
"k8s.io/kubernetes/plugin/pkg/scheduler/algorithm/predicates"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
schedulertesting "k8s.io/kubernetes/plugin/pkg/scheduler/testing"
)
type fakeBinder struct {
@ -111,8 +114,8 @@ func TestScheduler(t *testing.T) {
var gotAssumedPod *api.Pod
var gotBinding *api.Binding
c := &Config{
Modeler: &FakeModeler{
AssumePodFunc: func(pod *api.Pod) {
SchedulerCache: &schedulertesting.FakeCache{
AssumeFunc: func(pod *api.Pod) {
gotAssumedPod = pod
},
},
@ -189,42 +192,30 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
wg.Wait()
}()
// Setup modeler so we control the contents of all 3 stores: assumed,
// scheduled and queued
// Setup stores to test pod's workflow:
// - queuedPodStore: pods queued before processing
// - scheduledPodStore: pods that has a scheduling decision
scheduledPodStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
scheduledPodLister := &cache.StoreToPodLister{Store: scheduledPodStore}
queuedPodStore := cache.NewFIFO(cache.MetaNamespaceKeyFunc)
queuedPodLister := &cache.StoreToPodLister{Store: queuedPodStore}
modeler := NewSimpleModeler(queuedPodLister, scheduledPodLister)
// Create a fake clock used to timestamp entries and calculate ttl. Nothing
// will expire till we flip to something older than the ttl, at which point
// all entries inserted with fakeTime will expire.
ttl := 30 * time.Second
fakeTime := time.Date(2009, time.November, 10, 23, 0, 0, 0, time.UTC)
fakeClock := util.NewFakeClock(fakeTime)
ttlPolicy := &cache.TTLPolicy{Ttl: ttl, Clock: fakeClock}
assumedPodsStore := cache.NewFakeExpirationStore(
cache.MetaNamespaceKeyFunc, nil, ttlPolicy, fakeClock)
modeler.assumedPods = &cache.StoreToPodLister{Store: assumedPodsStore}
// Port is the easiest way to cause a fit predicate failure
podPort := 8080
firstPod := podWithPort("foo", "", podPort)
stop := make(chan struct{})
defer close(stop)
cache := schedulercache.New(1*time.Second, stop)
// Create the scheduler config
algo := NewGenericScheduler(
cache,
map[string]algorithm.FitPredicate{"PodFitsHostPorts": predicates.PodFitsHostPorts},
[]algorithm.PriorityConfig{},
[]algorithm.SchedulerExtender{},
modeler.PodLister(),
rand.New(rand.NewSource(time.Now().UnixNano())))
var gotBinding *api.Binding
c := &Config{
Modeler: modeler,
SchedulerCache: cache,
NodeLister: algorithm.FakeNodeLister(
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
),
@ -271,10 +262,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
if exists {
t.Errorf("Did not expect a queued pod, found %+v", pod)
}
pod, exists, _ = assumedPodsStore.GetByKey("foo")
if !exists {
t.Errorf("Assumed pod store should contain stale pod")
}
expectBind := &api.Binding{
ObjectMeta: api.ObjectMeta{Name: "foo"},
@ -288,10 +275,6 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
events.Stop()
scheduledPodStore.Delete(pod)
_, exists, _ = assumedPodsStore.Get(pod)
if !exists {
t.Errorf("Expected pod %#v in assumed pod store", pod)
}
secondPod := podWithPort("bar", "", podPort)
queuedPodStore.Add(secondPod)
@ -299,10 +282,26 @@ func TestSchedulerForgetAssumedPodAfterDelete(t *testing.T) {
// scheduledPodStore: []
// assumedPods: [foo:8080]
var waitUntilExpired sync.WaitGroup
waitUntilExpired.Add(1)
// waiting for the assumed pod to expire
go func() {
for {
pods, err := cache.List(labels.Everything())
if err != nil {
t.Fatalf("cache.List failed: %v", err)
}
if len(pods) == 0 {
waitUntilExpired.Done()
return
}
time.Sleep(1 * time.Second)
}
}()
waitUntilExpired.Wait()
// Second scheduling pass will fail to schedule if the store hasn't expired
// the deleted pod. This would normally happen with a timeout.
//expirationPolicy.NeverExpire = util.NewStringSet()
fakeClock.Step(ttl + 1)
called = make(chan struct{})
events = eventBroadcaster.StartEventWatcher(func(e *api.Event) {

View File

@ -59,7 +59,7 @@ type schedulerCache struct {
type podState struct {
pod *api.Pod
// Used by assumedPod to determinate expiration.
deadline time.Time
deadline *time.Time
}
func newSchedulerCache(ttl, period time.Duration, stop chan struct{}) *schedulerCache {
@ -120,9 +120,10 @@ func (cache *schedulerCache) assumePodIfBindSucceed(pod *api.Pod, bind func() bo
}
cache.addPod(pod)
dl := now.Add(cache.ttl)
ps := &podState{
pod: pod,
deadline: now.Add(cache.ttl),
deadline: &dl,
}
cache.podStates[key] = ps
cache.assumedPods[key] = true
@ -142,9 +143,14 @@ func (cache *schedulerCache) AddPod(pod *api.Pod) error {
switch {
case ok && cache.assumedPods[key]:
delete(cache.assumedPods, key)
cache.podStates[key].deadline = nil
case !ok:
// Pod was expired. We should add it back.
cache.addPod(pod)
ps := &podState{
pod: pod,
}
cache.podStates[key] = ps
default:
return fmt.Errorf("pod was already in added state. Pod key: %v", key)
}
@ -246,7 +252,7 @@ func (cache *schedulerCache) cleanupAssumedPods(now time.Time) {
if !ok {
panic("Key found in assumed set but not in podStates. Potentially a logical error.")
}
if now.After(ps.deadline) {
if now.After(*ps.deadline) {
if err := cache.expirePod(key, ps); err != nil {
glog.Errorf(" expirePod failed for %s: %v", key, err)
}

View File

@ -274,7 +274,71 @@ func TestUpdatePod(t *testing.T) {
podsToUpdate []*api.Pod
wNodeInfo []*NodeInfo
}{{ // Pod is assumed and added. Then it would be updated twice.
}{{ // add a pod and then update it twice
podsToAdd: []*api.Pod{testPods[0]},
podsToUpdate: []*api.Pod{testPods[0], testPods[1], testPods[0]},
wNodeInfo: []*NodeInfo{{
requestedResource: &Resource{
MilliCPU: 200,
Memory: 1024,
},
nonzeroRequest: &Resource{
MilliCPU: 200,
Memory: 1024,
},
pods: []*api.Pod{testPods[1]},
}, {
requestedResource: &Resource{
MilliCPU: 100,
Memory: 500,
},
nonzeroRequest: &Resource{
MilliCPU: 100,
Memory: 500,
},
pods: []*api.Pod{testPods[0]},
}},
}}
for _, tt := range tests {
cache := newSchedulerCache(ttl, time.Second, nil)
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
}
for i := range tt.podsToUpdate {
if i == 0 {
continue
}
if err := cache.UpdatePod(tt.podsToUpdate[i-1], tt.podsToUpdate[i]); err != nil {
t.Fatalf("UpdatePod failed: %v", err)
}
// check after expiration. confirmed pods shouldn't be expired.
n := cache.nodes[nodeName]
if !reflect.DeepEqual(n, tt.wNodeInfo[i-1]) {
t.Errorf("#%d: node info get=%s, want=%s", i-1, n, tt.wNodeInfo)
}
}
}
}
// TestExpireAddUpdatePod test the sequence that a pod is expired, added, then updated
func TestExpireAddUpdatePod(t *testing.T) {
nodeName := "node"
ttl := 10 * time.Second
testPods := []*api.Pod{
makeBasePod(nodeName, "test", "100m", "500", []api.ContainerPort{{HostPort: 80}}),
makeBasePod(nodeName, "test", "200m", "1Ki", []api.ContainerPort{{HostPort: 8080}}),
}
tests := []struct {
podsToAssume []*api.Pod
podsToAdd []*api.Pod
podsToUpdate []*api.Pod
wNodeInfo []*NodeInfo
}{{ // Pod is assumed, expired, and added. Then it would be updated twice.
podsToAssume: []*api.Pod{testPods[0]},
podsToAdd: []*api.Pod{testPods[0]},
podsToUpdate: []*api.Pod{testPods[0], testPods[1], testPods[0]},
@ -309,6 +373,8 @@ func TestUpdatePod(t *testing.T) {
t.Fatalf("assumePod failed: %v", err)
}
}
cache.cleanupAssumedPods(now.Add(2 * ttl))
for _, podToAdd := range tt.podsToAdd {
if err := cache.AddPod(podToAdd); err != nil {
t.Fatalf("AddPod failed: %v", err)
@ -356,9 +422,6 @@ func TestRemovePod(t *testing.T) {
for i, tt := range tests {
cache := newSchedulerCache(time.Second, time.Second, nil)
if err := cache.AssumePodIfBindSucceed(tt.pod, alwaysTrue); err != nil {
t.Fatalf("assumePod failed: %v", err)
}
if err := cache.AddPod(tt.pod); err != nil {
t.Fatalf("AddPod failed: %v", err)
}
@ -449,12 +512,7 @@ func setupCacheOf1kNodes30kPods(b *testing.B) Cache {
objName := fmt.Sprintf("%s-pod-%d", nodeName, j)
pod := makeBasePod(nodeName, objName, "0", "0", nil)
err := cache.AssumePodIfBindSucceed(pod, alwaysTrue)
if err != nil {
b.Fatalf("AssumePodIfBindSucceed failed: %v", err)
}
err = cache.AddPod(pod)
if err != nil {
if err := cache.AddPod(pod); err != nil {
b.Fatalf("AddPod failed: %v", err)
}
}

View File

@ -23,32 +23,35 @@ import (
// Cache collects pods' information and provides node-level aggregated information.
// It's intended for generic scheduler to do efficient lookup.
// Cache's operations are pod centric. It incrementally updates itself based on pod events.
// Cache's operations are pod centric. It does incremental updates based on pod events.
// Pod events are sent via network. We don't have guaranteed delivery of all events:
// We use Reflector to list and watch from remote.
// Reflector might be slow and do a relist, which would lead to missing events.
//
// State Machine of a pod's events in scheduler's cache:
//
// +-------+
// | |
// | | Update
// Assume Add + |
// Initial +--------> Assumed +------------+---> Added <--+
// + | +
// | | |
// | Add | | Remove
// | | |
// | + |
// +-------------> Expired +----> Deleted
//
// +-------------------------------------------+ +----+
// | Add | | |
// | | | | Update
// + Assume Add v v |
//Initial +--------> Assumed +------------+---> Added <--+
// + | +
// | | |
// | Add | | Remove
// | | |
// | + |
// +-------------> Expired +----> Deleted
// Expire
//
//
// Note that an assumed pod can expire, because if we haven't received Add event notifying us
// for a while, there might be some problems and we shouldn't keep the pod in cache anymore.
//
// Note that "Initial", "Expired", and "Deleted" pods do not actually exist in cache.
// Based on existing use cases, we are making the following assumptions:
// - No pod would be assumed twice
// - A pod could be added without going through scheduler. In this case, we will see Add but not Assume event.
// - If a pod wasn't added, it wouldn't be removed or updated.
// - Both "Expired" and "Deleted" are valid end states. In case of some problems, e.g. network issue,
// a pod might have changed its state (e.g. added and deleted) without delivering notification to the cache.

View File

@ -0,0 +1,48 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// FakeCache is used for testing
type FakeCache struct {
AssumeFunc func(*api.Pod)
}
func (f *FakeCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
f.AssumeFunc(pod)
return nil
}
func (f *FakeCache) AddPod(pod *api.Pod) error { return nil }
func (f *FakeCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil }
func (f *FakeCache) RemovePod(pod *api.Pod) error { return nil }
func (f *FakeCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return nil, nil
}
func (f *FakeCache) List(s labels.Selector) ([]*api.Pod, error) { return nil, nil }

View File

@ -0,0 +1,52 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package schedulercache
import (
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/labels"
"k8s.io/kubernetes/plugin/pkg/scheduler/schedulercache"
)
// PodsToCache is used for testing
type PodsToCache []*api.Pod
func (p PodsToCache) AssumePodIfBindSucceed(pod *api.Pod, bind func() bool) error {
if !bind() {
return nil
}
return nil
}
func (p PodsToCache) AddPod(pod *api.Pod) error { return nil }
func (p PodsToCache) UpdatePod(oldPod, newPod *api.Pod) error { return nil }
func (p PodsToCache) RemovePod(pod *api.Pod) error { return nil }
func (p PodsToCache) GetNodeNameToInfoMap() (map[string]*schedulercache.NodeInfo, error) {
return schedulercache.CreateNodeNameToInfoMap(p), nil
}
func (p PodsToCache) List(s labels.Selector) (selected []*api.Pod, err error) {
for _, pod := range p {
if s.Matches(labels.Set(pod.Labels)) {
selected = append(selected, pod)
}
}
return selected, nil
}