diff --git a/pkg/client/cache/listers.go b/pkg/client/cache/listers.go index b58a79e16cc..f989116fad8 100644 --- a/pkg/client/cache/listers.go +++ b/pkg/client/cache/listers.go @@ -52,6 +52,15 @@ func (s *StoreToPodLister) List(selector labels.Selector) (pods []api.Pod, err e return pods, nil } +// Exists returns true if a pod matching the namespace/name of the given pod exists in the store. +func (s *StoreToPodLister) Exists(pod *api.Pod) (bool, error) { + _, exists, err := s.Store.Get(pod) + if err != nil { + return false, err + } + return exists, nil +} + // StoreToNodeLister makes a Store have the List method of the client.NodeInterface // The Store must contain (only) Nodes. type StoreToNodeLister struct { diff --git a/pkg/client/cache/listers_test.go b/pkg/client/cache/listers_test.go index eded9f5f676..3f759c3c715 100644 --- a/pkg/client/cache/listers_test.go +++ b/pkg/client/cache/listers_test.go @@ -72,5 +72,21 @@ func TestStoreToPodLister(t *testing.T) { t.Errorf("Expected %v, got %v", e, a) continue } + + exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: id}}) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if !exists { + t.Errorf("exists returned false for %v", id) + } + } + + exists, err := spl.Exists(&api.Pod{ObjectMeta: api.ObjectMeta{Name: "qux"}}) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + if exists { + t.Errorf("Unexpected pod exists") } } diff --git a/pkg/scheduler/predicates.go b/pkg/scheduler/predicates.go index f417111e64e..b4f87ddcd62 100644 --- a/pkg/scheduler/predicates.go +++ b/pkg/scheduler/predicates.go @@ -342,6 +342,15 @@ func MapPodsToMachines(lister PodLister) (map[string][]api.Pod, error) { return map[string][]api.Pod{}, err } for _, scheduledPod := range pods { + // TODO: switch to Spec.Host! There was some confusion previously + // about whether components should judge a pod's location + // based on spec.Host or status.Host. It has been decided that + // spec.Host is the canonical location of the pod. Status.Host + // will either be removed, be a copy, or in theory it could be + // used as a signal that kubelet has agreed to run the pod. + // + // This could be fixed now, but just requires someone to try it + // and verify that e2e still passes. host := scheduledPod.Status.Host machineToPods[host] = append(machineToPods[host], scheduledPod) } diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index fefb3f4e5f0..8a5feba76ce 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -41,23 +41,31 @@ type ConfigFactory struct { Client *client.Client // queue for pods that need scheduling PodQueue *cache.FIFO - // a means to list all scheduled pods - PodLister *cache.StoreToPodLister + // a means to list all known scheduled pods. + ScheduledPodLister *cache.StoreToPodLister + // a means to list all known scheduled pods and pods assumed to have been scheduled. + PodLister algorithm.PodLister // a means to list all minions NodeLister *cache.StoreToNodeLister // a means to list all services ServiceLister *cache.StoreToServiceLister + + modeler scheduler.SystemModeler } // Initializes the factory. func NewConfigFactory(client *client.Client) *ConfigFactory { - return &ConfigFactory{ - Client: client, - PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), - PodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, - NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, - ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + c := &ConfigFactory{ + Client: client, + PodQueue: cache.NewFIFO(cache.MetaNamespaceKeyFunc), + ScheduledPodLister: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + NodeLister: &cache.StoreToNodeLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + ServiceLister: &cache.StoreToServiceLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, } + modeler := scheduler.NewSimpleModeler(&cache.StoreToPodLister{c.PodQueue}, c.ScheduledPodLister) + c.modeler = modeler + c.PodLister = modeler.PodLister() + return c } // Create creates a scheduler with the default algorithm provider. @@ -118,7 +126,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. - cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.PodLister.Store, 0).Run() + cache.NewReflector(f.createAssignedPodLW(), &api.Pod{}, f.ScheduledPodLister.Store, 0).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. @@ -148,6 +156,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe } return &scheduler.Config{ + Modeler: f.modeler, MinionLister: f.NodeLister, Algorithm: algo, Binder: &binder{f.Client}, diff --git a/plugin/pkg/scheduler/modeler.go b/plugin/pkg/scheduler/modeler.go new file mode 100644 index 00000000000..97e0907475b --- /dev/null +++ b/plugin/pkg/scheduler/modeler.go @@ -0,0 +1,155 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "fmt" + "strings" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + + "github.com/golang/glog" +) + +var ( + _ = SystemModeler(&FakeModeler{}) + _ = SystemModeler(&SimpleModeler{}) +) + +// ExtendedPodLister: SimpleModeler needs to be able to check for a pod's +// existance in addition to listing the pods. +type ExtendedPodLister interface { + algorithm.PodLister + Exists(pod *api.Pod) (bool, error) +} + +// FakeModeler implements the SystemModeler interface. +type FakeModeler struct { + AssumePodFunc func(pod *api.Pod) +} + +// AssumePod calls the function variable if it is not nil. +func (f *FakeModeler) AssumePod(pod *api.Pod) { + if f.AssumePodFunc != nil { + f.AssumePodFunc(pod) + } +} + +// SimpleModeler implements the SystemModeler interface with a timed pod cache. +type SimpleModeler struct { + queuedPods ExtendedPodLister + scheduledPods ExtendedPodLister + + // assumedPods holds the pods that we think we've scheduled, but that + // haven't yet shown up in the scheduledPods variable. + // TODO: periodically clear this. + assumedPods *cache.StoreToPodLister +} + +// NewSimpleModeler returns a new SimpleModeler. +// queuedPods: a PodLister that will return pods that have not been scheduled yet. +// scheduledPods: a PodLister that will return pods that we know for sure have been scheduled. +func NewSimpleModeler(queuedPods, scheduledPods ExtendedPodLister) *SimpleModeler { + return &SimpleModeler{ + queuedPods: queuedPods, + scheduledPods: scheduledPods, + assumedPods: &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)}, + } +} + +func (s *SimpleModeler) AssumePod(pod *api.Pod) { + s.assumedPods.Add(pod) +} + +// Extract names for readable logging. +func podNames(pods []api.Pod) []string { + out := make([]string, len(pods)) + for i := range pods { + out[i] = fmt.Sprintf("'%v/%v (%v)'", pods[i].Namespace, pods[i].Name, pods[i].UID) + } + return out +} + +func (s *SimpleModeler) listPods(selector labels.Selector) (pods []api.Pod, err error) { + assumed, err := s.assumedPods.List(selector) + if err != nil { + return nil, err + } + // Since the assumed list will be short, just check every one. + // Goal here is to stop making assumptions about a pod once it shows + // up in one of these other lists. + // TODO: there's a possibility that a pod could get deleted at the + // exact wrong time and linger in assumedPods forever. So we + // need go through that periodically and check for deleted + // pods. + for _, pod := range assumed { + qExist, err := s.queuedPods.Exists(&pod) + if err != nil { + return nil, err + } + if qExist { + s.assumedPods.Store.Delete(&pod) + continue + } + sExist, err := s.scheduledPods.Exists(&pod) + if err != nil { + return nil, err + } + if sExist { + s.assumedPods.Store.Delete(&pod) + continue + } + } + + scheduled, err := s.scheduledPods.List(selector) + if err != nil { + return nil, err + } + // re-get in case we deleted any. + assumed, err = s.assumedPods.List(selector) + if err != nil { + return nil, err + } + if len(assumed) == 0 { + return scheduled, nil + } + glog.V(2).Infof( + "listing pods: [%v] assumed to exist in addition to %v known pods.", + strings.Join(podNames(assumed), ","), + len(scheduled), + ) + return append(scheduled, assumed...), nil +} + +// PodLister returns a PodLister that will list pods that we think we have scheduled in +// addition to pods that we know have been scheduled. +func (s *SimpleModeler) PodLister() algorithm.PodLister { + return simpleModelerPods{s} +} + +// simpleModelerPods is an adaptor so that SimpleModeler can be a PodLister. +type simpleModelerPods struct { + simpleModeler *SimpleModeler +} + +// List returns pods known and assumed to exist. +func (s simpleModelerPods) List(selector labels.Selector) (pods []api.Pod, err error) { + return s.simpleModeler.listPods(selector) +} diff --git a/plugin/pkg/scheduler/modeler_test.go b/plugin/pkg/scheduler/modeler_test.go new file mode 100644 index 00000000000..45fd3914b89 --- /dev/null +++ b/plugin/pkg/scheduler/modeler_test.go @@ -0,0 +1,111 @@ +/* +Copyright 2015 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" +) + +type nn struct { + namespace, name string +} + +type names []nn + +func (ids names) list() []api.Pod { + out := make([]api.Pod, len(ids)) + for i, id := range ids { + out[i] = api.Pod{ + ObjectMeta: api.ObjectMeta{ + Namespace: id.namespace, + Name: id.name, + }, + } + } + return out +} + +func (ids names) has(pod *api.Pod) bool { + for _, id := range ids { + if pod.Namespace == id.namespace && pod.Name == id.name { + return true + } + } + return false +} + +func TestModeler(t *testing.T) { + table := []struct { + queuedPods []api.Pod + scheduledPods []api.Pod + assumedPods []api.Pod + expectPods names + }{ + { + queuedPods: names{}.list(), + scheduledPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), + assumedPods: names{{"default", "foo"}}.list(), + expectPods: names{{"default", "foo"}, {"custom", "foo"}}, + }, { + queuedPods: names{}.list(), + scheduledPods: names{{"default", "foo"}}.list(), + assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), + expectPods: names{{"default", "foo"}, {"custom", "foo"}}, + }, { + queuedPods: names{{"custom", "foo"}}.list(), + scheduledPods: names{{"default", "foo"}}.list(), + assumedPods: names{{"default", "foo"}, {"custom", "foo"}}.list(), + expectPods: names{{"default", "foo"}}, + }, + } + + for _, item := range table { + q := &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)} + for i := range item.queuedPods { + q.Store.Add(&item.queuedPods[i]) + } + s := &cache.StoreToPodLister{cache.NewStore(cache.MetaNamespaceKeyFunc)} + for i := range item.scheduledPods { + s.Store.Add(&item.scheduledPods[i]) + } + m := NewSimpleModeler(q, s) + for i := range item.assumedPods { + m.AssumePod(&item.assumedPods[i]) + } + + list, err := m.PodLister().List(labels.Everything()) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + + found := 0 + for _, pod := range list { + if item.expectPods.has(&pod) { + found++ + } else { + t.Errorf("found unexpected pod %#v", pod) + } + } + if e, a := item.expectPods, found; len(e) != a { + t.Errorf("Expected pods:\n%+v\nFound pods:\n%v\n", e, list) + } + } +} diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index a23205f18a7..86b45f4322b 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -31,6 +31,21 @@ type Binder interface { Bind(binding *api.Binding) error } +// SystemModeler can help scheduler produce a model of the system that +// anticipates reality. For example, if scheduler has pods A and B both +// using hostPort 80, when it binds A to machine M it should not bind B +// to machine M in the time when it hasn't observed the binding of A +// take effect yet. +// +// Since the model is only an optimization, it's expected to handle +// any errors itself without sending them back to the scheduler. +type SystemModeler interface { + // AssumePod assumes that the given pod exists in the system. + // The assumtion should last until the system confirms the + // assumtion or disconfirms it. + AssumePod(pod *api.Pod) +} + // Scheduler watches for new unscheduled pods. It attempts to find // minions that they fit on and writes bindings back to the api server. type Scheduler struct { @@ -38,6 +53,9 @@ type Scheduler struct { } type Config struct { + // It is expected that changes made via modeler will be observed + // by MinionLister and Algorithm. + Modeler SystemModeler MinionLister scheduler.MinionLister Algorithm scheduler.Scheduler Binder Binder @@ -93,4 +111,9 @@ func (s *Scheduler) scheduleOne() { return } s.config.Recorder.Eventf(pod, "scheduled", "Successfully assigned %v to %v", pod.Name, dest) + // tell the model to assume that this binding took effect. + assumed := *pod + assumed.Spec.Host = dest + assumed.Status.Host = dest + s.config.Modeler.AssumePod(&assumed) } diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go index 5374b929cf9..ef07d4e3139 100644 --- a/plugin/pkg/scheduler/scheduler_test.go +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -34,8 +34,16 @@ type fakeBinder struct { func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) } -func podWithID(id string) *api.Pod { - return &api.Pod{ObjectMeta: api.ObjectMeta{Name: id, SelfLink: testapi.SelfLink("pods", id)}} +func podWithID(id, desiredHost string) *api.Pod { + return &api.Pod{ + ObjectMeta: api.ObjectMeta{Name: id, SelfLink: testapi.SelfLink("pods", id)}, + Spec: api.PodSpec{ + Host: desiredHost, + }, + Status: api.PodStatus{ + Host: desiredHost, + }, + } } type mockScheduler struct { @@ -53,32 +61,34 @@ func TestScheduler(t *testing.T) { errB := errors.New("binder") table := []struct { - injectBindError error - sendPod *api.Pod - algo scheduler.Scheduler - expectErrorPod *api.Pod - expectError error - expectBind *api.Binding - eventReason string + injectBindError error + sendPod *api.Pod + algo scheduler.Scheduler + expectErrorPod *api.Pod + expectAssumedPod *api.Pod + expectError error + expectBind *api.Binding + eventReason string }{ { - sendPod: podWithID("foo"), - algo: mockScheduler{"machine1", nil}, - expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}}, - eventReason: "scheduled", + sendPod: podWithID("foo", ""), + algo: mockScheduler{"machine1", nil}, + expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}}, + expectAssumedPod: podWithID("foo", "machine1"), + eventReason: "scheduled", }, { - sendPod: podWithID("foo"), + sendPod: podWithID("foo", ""), algo: mockScheduler{"machine1", errS}, expectError: errS, - expectErrorPod: podWithID("foo"), + expectErrorPod: podWithID("foo", ""), eventReason: "failedScheduling", }, { - sendPod: podWithID("foo"), + sendPod: podWithID("foo", ""), algo: mockScheduler{"machine1", nil}, expectBind: &api.Binding{ObjectMeta: api.ObjectMeta{Name: "foo"}, Target: api.ObjectReference{Kind: "Node", Name: "machine1"}}, injectBindError: errB, expectError: errB, - expectErrorPod: podWithID("foo"), + expectErrorPod: podWithID("foo", ""), eventReason: "failedScheduling", }, } @@ -86,8 +96,14 @@ func TestScheduler(t *testing.T) { for i, item := range table { var gotError error var gotPod *api.Pod + var gotAssumedPod *api.Pod var gotBinding *api.Binding c := &Config{ + Modeler: &FakeModeler{ + AssumePodFunc: func(pod *api.Pod) { + gotAssumedPod = pod + }, + }, MinionLister: scheduler.FakeMinionLister( api.NodeList{Items: []api.Node{{ObjectMeta: api.ObjectMeta{Name: "machine1"}}}}, ), @@ -114,6 +130,9 @@ func TestScheduler(t *testing.T) { close(called) }) s.scheduleOne() + if e, a := item.expectAssumedPod, gotAssumedPod; !reflect.DeepEqual(e, a) { + t.Errorf("%v: assumed pod: wanted %v, got %v", i, e, a) + } if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) { t.Errorf("%v: error pod: wanted %v, got %v", i, e, a) }