mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-31 07:20:13 +00:00
Add a system modeler to scheduler
So it can try to predict the effect its bindings will have.
This commit is contained in:
parent
6319f8a568
commit
992d78a32e
9
pkg/client/cache/listers.go
vendored
9
pkg/client/cache/listers.go
vendored
@ -52,6 +52,15 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []api.Pod, err e
|
||||
return pods, nil
|
||||
}
|
||||
|
||||
// Exists returns true if a pod matching the namespace/name of the given pod exists in the store.
|
||||
func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) {
|
||||
_, exists, err := s.Store.Get(pod)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return exists, nil
|
||||
}
|
||||
|
||||
// StoreToNodeLister makes a Store have the List method of the client.NodeInterface
|
||||
// The Store must contain (only) Nodes.
|
||||
type StoreToNodeLister struct {
|
||||
|
16
pkg/client/cache/listers_test.go
vendored
16
pkg/client/cache/listers_test.go
vendored
@ -72,5 +72,21 @@ func TestStoreToPodLister(t *testing.T) {
|
||||
t.Errorf("Expected %v, got %v", e, a)
|
||||
continue
|
||||
}
|
||||
|
||||
exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: id}})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if !exists {
|
||||
t.Errorf("exists returned false for %v", id)
|
||||
}
|
||||
}
|
||||
|
||||
exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "qux"}})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if exists {
|
||||
t.Errorf("Unexpected pod exists")
|
||||
}
|
||||
}
|
||||
|
@ -342,6 +342,15 @@ func MapPodsToMachines(lister PodLister) (map[string][]api.Pod, error) {
|
||||
return map[string][]api.Pod{}, err
|
||||
}
|
||||
for _, scheduledPod := range pods {
|
||||
// TODO: switch to Spec.Host! There was some confusion previously
|
||||
// about whether components should judge a pod's location
|
||||
// based on spec.Host or status.Host. It has been decided that
|
||||
// spec.Host is the canonical location of the pod. Status.Host
|
||||
// will either be removed, be a copy, or in theory it could be
|
||||
// used as a signal that kubelet has agreed to run the pod.
|
||||
//
|
||||
// This could be fixed now, but just requires someone to try it
|
||||
// and verify that e2e still passes.
|
||||
host := scheduledPod.Status.Host
|
||||
machineToPods[host] = append(machineToPods[host], scheduledPod)
|
||||
}
|
||||
|
@ -41,23 +41,31 @@ type ConfigFactory struct {
|
||||
Client *client.Client
|
||||
// queue for pods that need scheduling
|
||||
PodQueue *cache.FIFO
|
||||
// a means to list all scheduled pods
|
||||
PodLister *cache.StoreToPodLister
|
||||
// a means to list all known scheduled pods.
|
||||
ScheduledPodLister *cache.StoreToPodLister
|
||||
// a means to list all known scheduled pods and pods assumed to have been scheduled.
|
||||
PodLister algorithm.PodLister
|
||||
// a means to list all minions
|
||||
NodeLister *cache.StoreToNodeLister
|
||||
// a means to list all services
|
||||
ServiceLister *cache.StoreToServiceLister
|
||||
|
||||
modeler scheduler.SystemModeler
|
||||
}
|
||||
|
||||
// Initializes the factory.
|
||||
func NewConfigFactory(client *client.Client) *ConfigFactory {
|
||||
return &ConfigFactory{
|
||||
Client: client,
|
||||
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
||||
PodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
c := &ConfigFactory{
|
||||
Client: client,
|
||||
PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc),
|
||||
ScheduledPodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
}
|
||||
modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister)
|
||||
c.modeler = modeler
|
||||
c.PodLister = modeler.PodLister()
|
||||
return c
|
||||
}
|
||||
|
||||
// Create creates a scheduler with the default algorithm provider.
|
||||
@ -118,7 +126,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
||||
|
||||
// Watch and cache all running pods. Scheduler needs to find all pods
|
||||
// so it knows where it's safe to place a pod. Cache this locally.
|
||||
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store, 0).Run()
|
||||
cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.ScheduledPodLister.Store, 0).Run()
|
||||
|
||||
// Watch minions.
|
||||
// Minions may be listed frequently, so provide a local up-to-date cache.
|
||||
@ -148,6 +156,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
|
||||
}
|
||||
|
||||
return &scheduler.Config{
|
||||
Modeler: f.modeler,
|
||||
MinionLister: f.NodeLister,
|
||||
Algorithm: algo,
|
||||
Binder: &binder{f.Client},
|
||||
|
155
plugin/pkg/scheduler/modeler.go
Normal file
155
plugin/pkg/scheduler/modeler.go
Normal file
@ -0,0 +1,155 @@
|
||||
/*
|
||||
Copyright 2015 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
|
||||
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
var (
|
||||
_ = SystemModeler(&FakeModeler{})
|
||||
_ = SystemModeler(&SimpleModeler{})
|
||||
)
|
||||
|
||||
// ExtendedPodLister: SimpleModeler needs to be able to check for a pod's
|
||||
// existance in addition to listing the pods.
|
||||
type ExtendedPodLister interface {
|
||||
algorithm.PodLister
|
||||
Exists(pod *api.Pod) (bool, error)
|
||||
}
|
||||
|
||||
// FakeModeler implements the SystemModeler interface.
|
||||
type FakeModeler struct {
|
||||
AssumePodFunc func(pod *api.Pod)
|
||||
}
|
||||
|
||||
// AssumePod calls the function variable if it is not nil.
|
||||
func (f *FakeModeler) AssumePod(pod *api.Pod) {
|
||||
if f.AssumePodFunc != nil {
|
||||
f.AssumePodFunc(pod)
|
||||
}
|
||||
}
|
||||
|
||||
// SimpleModeler implements the SystemModeler interface with a timed pod cache.
|
||||
type SimpleModeler struct {
|
||||
queuedPods ExtendedPodLister
|
||||
scheduledPods ExtendedPodLister
|
||||
|
||||
// assumedPods holds the pods that we think we've scheduled, but that
|
||||
// haven't yet shown up in the scheduledPods variable.
|
||||
// TODO: periodically clear this.
|
||||
assumedPods *cache.StoreToPodLister
|
||||
}
|
||||
|
||||
// NewSimpleModeler returns a new SimpleModeler.
|
||||
// queuedPods: a PodLister that will return pods that have not been scheduled yet.
|
||||
// scheduledPods: a PodLister that will return pods that we know for sure have been scheduled.
|
||||
func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModeler {
|
||||
return &SimpleModeler{
|
||||
queuedPods: queuedPods,
|
||||
scheduledPods: scheduledPods,
|
||||
assumedPods: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)},
|
||||
}
|
||||
}
|
||||
|
||||
func (s *SimpleModeler) AssumePod(pod *api.Pod) {
|
||||
s.assumedPods.Add(pod)
|
||||
}
|
||||
|
||||
// Extract names for readable logging.
|
||||
func podNames(pods []api.Pod) []string {
|
||||
out := make([]string, len(pods))
|
||||
for i := range pods {
|
||||
out[i] = fmt.Sprintf("'%v/%v (%v)'", pods[i].Namespace, pods[i].Name, pods[i].UID)
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err error) {
|
||||
assumed, err := s.assumedPods.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// Since the assumed list will be short, just check every one.
|
||||
// Goal here is to stop making assumptions about a pod once it shows
|
||||
// up in one of these other lists.
|
||||
// TODO: there's a possibility that a pod could get deleted at the
|
||||
// exact wrong time and linger in assumedPods forever. So we
|
||||
// need go through that periodically and check for deleted
|
||||
// pods.
|
||||
for _, pod := range assumed {
|
||||
qExist, err := s.queuedPods.Exists(&pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if qExist {
|
||||
s.assumedPods.Store.Delete(&pod)
|
||||
continue
|
||||
}
|
||||
sExist, err := s.scheduledPods.Exists(&pod)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if sExist {
|
||||
s.assumedPods.Store.Delete(&pod)
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
scheduled, err := s.scheduledPods.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
// re-get in case we deleted any.
|
||||
assumed, err = s.assumedPods.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if len(assumed) == 0 {
|
||||
return scheduled, nil
|
||||
}
|
||||
glog.V(2).Infof(
|
||||
"listing pods: [%v] assumed to exist in addition to %v known pods.",
|
||||
strings.Join(podNames(assumed), ","),
|
||||
len(scheduled),
|
||||
)
|
||||
return append(scheduled, assumed...), nil
|
||||
}
|
||||
|
||||
// PodLister returns a PodLister that will list pods that we think we have scheduled in
|
||||
// addition to pods that we know have been scheduled.
|
||||
func (s *SimpleModeler) PodLister() algorithm.PodLister {
|
||||
return simpleModelerPods{s}
|
||||
}
|
||||
|
||||
// simpleModelerPods is an adaptor so that SimpleModeler can be a PodLister.
|
||||
type simpleModelerPods struct {
|
||||
simpleModeler *SimpleModeler
|
||||
}
|
||||
|
||||
// List returns pods known and assumed to exist.
|
||||
func (s simpleModelerPods) List(selector labels.Selector) (pods []api.Pod, err error) {
|
||||
return s.simpleModeler.listPods(selector)
|
||||
}
|
111
plugin/pkg/scheduler/modeler_test.go
Normal file
111
plugin/pkg/scheduler/modeler_test.go
Normal file
@ -0,0 +1,111 @@
|
||||
/*
|
||||
Copyright 2015 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package scheduler
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
|
||||
)
|
||||
|
||||
type nn struct {
|
||||
namespace, name string
|
||||
}
|
||||
|
||||
type names []nn
|
||||
|
||||
func (ids names) list() []api.Pod {
|
||||
out := make([]api.Pod, len(ids))
|
||||
for i, id := range ids {
|
||||
out[i] = api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Namespace: id.namespace,
|
||||
Name: id.name,
|
||||
},
|
||||
}
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (ids names) has(pod *api.Pod) bool {
|
||||
for _, id := range ids {
|
||||
if pod.Namespace == id.namespace && pod.Name == id.name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func TestModeler(t *testing.T) {
|
||||
table := []struct {
|
||||
queuedPods []api.Pod
|
||||
scheduledPods []api.Pod
|
||||
assumedPods []api.Pod
|
||||
expectPods names
|
||||
}{
|
||||
{
|
||||
queuedPods: names{}.list(),
|
||||
scheduledPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
|
||||
assumedPods: names{{"default", "foo"}}.list(),
|
||||
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
|
||||
}, {
|
||||
queuedPods: names{}.list(),
|
||||
scheduledPods: names{{"default", "foo"}}.list(),
|
||||
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
|
||||
expectPods: names{{"default", "foo"}, {"custom", "foo"}},
|
||||
}, {
|
||||
queuedPods: names{{"custom", "foo"}}.list(),
|
||||
scheduledPods: names{{"default", "foo"}}.list(),
|
||||
assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(),
|
||||
expectPods: names{{"default", "foo"}},
|
||||
},
|
||||
}
|
||||
|
||||
for _, item := range table {
|
||||
q := &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
|
||||
for i := range item.queuedPods {
|
||||
q.Store.Add(&item.queuedPods[i])
|
||||
}
|
||||
s := &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}
|
||||
for i := range item.scheduledPods {
|
||||
s.Store.Add(&item.scheduledPods[i])
|
||||
}
|
||||
m := NewSimpleModeler(q, s)
|
||||
for i := range item.assumedPods {
|
||||
m.AssumePod(&item.assumedPods[i])
|
||||
}
|
||||
|
||||
list, err := m.PodLister().List(labels.Everything())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
|
||||
found := 0
|
||||
for _, pod := range list {
|
||||
if item.expectPods.has(&pod) {
|
||||
found++
|
||||
} else {
|
||||
t.Errorf("found unexpected pod %#v", pod)
|
||||
}
|
||||
}
|
||||
if e, a := item.expectPods, found; len(e) != a {
|
||||
t.Errorf("Expected pods:\n%+v\nFound pods:\n%v\n", e, list)
|
||||
}
|
||||
}
|
||||
}
|
@ -31,6 +31,21 @@ type Binder interface {
|
||||
Bind(binding *api.Binding) error
|
||||
}
|
||||
|
||||
// SystemModeler can help scheduler produce a model of the system that
|
||||
// anticipates reality. For example, if scheduler has pods A and B both
|
||||
// using hostPort 80, when it binds A to machine M it should not bind B
|
||||
// to machine M in the time when it hasn't observed the binding of A
|
||||
// take effect yet.
|
||||
//
|
||||
// Since the model is only an optimization, it's expected to handle
|
||||
// any errors itself without sending them back to the scheduler.
|
||||
type SystemModeler interface {
|
||||
// AssumePod assumes that the given pod exists in the system.
|
||||
// The assumtion should last until the system confirms the
|
||||
// assumtion or disconfirms it.
|
||||
AssumePod(pod *api.Pod)
|
||||
}
|
||||
|
||||
// Scheduler watches for new unscheduled pods. It attempts to find
|
||||
// minions that they fit on and writes bindings back to the api server.
|
||||
type Scheduler struct {
|
||||
@ -38,6 +53,9 @@ type Scheduler struct {
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// It is expected that changes made via modeler will be observed
|
||||
// by MinionLister and Algorithm.
|
||||
Modeler SystemModeler
|
||||
MinionLister scheduler.MinionLister
|
||||
Algorithm scheduler.Scheduler
|
||||
Binder Binder
|
||||
@ -93,4 +111,9 @@ func (s *Scheduler) scheduleOne() {
|
||||
return
|
||||
}
|
||||
s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest)
|
||||
// tell the model to assume that this binding took effect.
|
||||
assumed := *pod
|
||||
assumed.Spec.Host = dest
|
||||
assumed.Status.Host = dest
|
||||
s.config.Modeler.AssumePod(&assumed)
|
||||
}
|
||||
|
@ -34,8 +34,16 @@ type fakeBinder struct {
|
||||
|
||||
func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) }
|
||||
|
||||
func podWithID(id string) *api.Pod {
|
||||
return &api.Pod{ObjectMeta: api.ObjectMeta{Name: id, SelfLink: testapi.SelfLink("pods", id)}}
|
||||
func podWithID(id, desiredHost string) *api.Pod {
|
||||
return &api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{Name: id, SelfLink: testapi.SelfLink("pods", id)},
|
||||
Spec: api.PodSpec{
|
||||
Host: desiredHost,
|
||||
},
|
||||
Status: api.PodStatus{
|
||||
Host: desiredHost,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
type mockScheduler struct {
|
||||
@ -53,32 +61,34 @@ func TestScheduler(t *testing.T) {
|
||||
errB := errors.New("binder")
|
||||
|
||||
table := []struct {
|
||||
injectBindError error
|
||||
sendPod *api.Pod
|
||||
algo scheduler.Scheduler
|
||||
expectErrorPod *api.Pod
|
||||
expectError error
|
||||
expectBind *api.Binding
|
||||
eventReason string
|
||||
injectBindError error
|
||||
sendPod *api.Pod
|
||||
algo scheduler.Scheduler
|
||||
expectErrorPod *api.Pod
|
||||
expectAssumedPod *api.Pod
|
||||
expectError error
|
||||
expectBind *api.Binding
|
||||
eventReason string
|
||||
}{
|
||||
{
|
||||
sendPod: podWithID("foo"),
|
||||
algo: mockScheduler{"machine1", nil},
|
||||
expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}},
|
||||
eventReason: "scheduled",
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{"machine1", nil},
|
||||
expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}},
|
||||
expectAssumedPod: podWithID("foo", "machine1"),
|
||||
eventReason: "scheduled",
|
||||
}, {
|
||||
sendPod: podWithID("foo"),
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{"machine1", errS},
|
||||
expectError: errS,
|
||||
expectErrorPod: podWithID("foo"),
|
||||
expectErrorPod: podWithID("foo", ""),
|
||||
eventReason: "failedScheduling",
|
||||
}, {
|
||||
sendPod: podWithID("foo"),
|
||||
sendPod: podWithID("foo", ""),
|
||||
algo: mockScheduler{"machine1", nil},
|
||||
expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}},
|
||||
injectBindError: errB,
|
||||
expectError: errB,
|
||||
expectErrorPod: podWithID("foo"),
|
||||
expectErrorPod: podWithID("foo", ""),
|
||||
eventReason: "failedScheduling",
|
||||
},
|
||||
}
|
||||
@ -86,8 +96,14 @@ func TestScheduler(t *testing.T) {
|
||||
for i, item := range table {
|
||||
var gotError error
|
||||
var gotPod *api.Pod
|
||||
var gotAssumedPod *api.Pod
|
||||
var gotBinding *api.Binding
|
||||
c := &Config{
|
||||
Modeler: &FakeModeler{
|
||||
AssumePodFunc: func(pod *api.Pod) {
|
||||
gotAssumedPod = pod
|
||||
},
|
||||
},
|
||||
MinionLister: scheduler.FakeMinionLister(
|
||||
api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}},
|
||||
),
|
||||
@ -114,6 +130,9 @@ func TestScheduler(t *testing.T) {
|
||||
close(called)
|
||||
})
|
||||
s.scheduleOne()
|
||||
if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("%v: assumed pod: wanted %v, got %v", i, e, a)
|
||||
}
|
||||
if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) {
|
||||
t.Errorf("%v: error pod: wanted %v, got %v", i, e, a)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user