From 4c3f509d94d63db655da8f339c363bbc3300ee53 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Thu, 14 Aug 2014 15:42:05 -0700 Subject: [PATCH 1/8] Make cache.Reflector more injectable. Add test for resource version state keeping. --- pkg/client/cache/reflector.go | 44 +++++++++------ pkg/client/cache/reflector_test.go | 90 ++++++++++++++++++++---------- 2 files changed, 86 insertions(+), 48 deletions(-) diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index bb7be90660a..14b97903b47 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -21,7 +21,6 @@ import ( "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/golang/glog" @@ -41,42 +40,49 @@ type Store interface { // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { - kubeClient *client.Client - resource string + // The type of object we expect to place in the store. expectedType reflect.Type - store Store + // The destination to sync up with the watch source + store Store + // watchCreater is called to initiate watches. + watchFactory WatchFactory + // loopDelay controls timing between one watch ending and + // the beginning of the next one. + loopDelay time.Duration } +// WatchFactory should begin a watch at the specified version. +type WatchFactory func(resourceVersion uint64) (watch.Interface, error) + // NewReflector makes a new Reflector object which will keep the given store up to // date with the server's contents for the given resource. Reflector promises to // only put things in the store that have the type of expectedType. -// TODO: define a query so you only locally cache a subset of items. -func NewReflector(resource string, kubeClient *client.Client, expectedType interface{}, store Store) *Reflector { +func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Store) *Reflector { gc := &Reflector{ - resource: resource, - kubeClient: kubeClient, + watchFactory: watchFactory, store: store, expectedType: reflect.TypeOf(expectedType), + loopDelay: time.Second, } return gc } +// Run starts a watch and handles watch events. Will restart the watch if it is closed. +// Run starts a goroutine and returns immediately. func (gc *Reflector) Run() { + var resourceVersion uint64 go util.Forever(func() { - w, err := gc.startWatch() + w, err := gc.watchFactory(resourceVersion) if err != nil { - glog.Errorf("failed to watch %v: %v", gc.resource, err) + glog.Errorf("failed to watch %v: %v", gc.expectedType, err) return } - gc.watchHandler(w) - }, 5*time.Second) + gc.watchHandler(w, &resourceVersion) + }, gc.loopDelay) } -func (gc *Reflector) startWatch() (watch.Interface, error) { - return gc.kubeClient.Get().Path(gc.resource).Path("watch").Watch() -} - -func (gc *Reflector) watchHandler(w watch.Interface) { +// watchHandler watches w and keeps *resourceVersion up to date. +func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { for { event, ok := <-w.ResultChan() if !ok { @@ -102,5 +108,9 @@ func (gc *Reflector) watchHandler(w watch.Interface) { default: glog.Errorf("unable to understand watch event %#v", event) } + next := jsonBase.ResourceVersion() + 1 + if next > *resourceVersion { + *resourceVersion = next + } } } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index ec32a7727a7..fa2b702a64a 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -17,29 +17,27 @@ limitations under the License. package cache import ( - "net/http" - "net/http/httptest" "testing" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestReflector_watchHandler(t *testing.T) { s := NewStore() - g := NewReflector("foo", nil, &api.Pod{}, s) + g := NewReflector(nil, &api.Pod{}, s) fw := watch.NewFake() s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}) go func() { fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) - fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz"}}) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}}) fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}}) fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) fw.Stop() }() - g.watchHandler(fw) + var resumeRV uint64 + g.watchHandler(fw, &resumeRV) table := []struct { ID string @@ -49,7 +47,7 @@ func TestReflector_watchHandler(t *testing.T) { {"foo", 0, false}, {"rejected", 0, false}, {"bar", 55, true}, - {"baz", 0, true}, + {"baz", 32, true}, } for _, item := range table { obj, exists := s.Get(item.ID) @@ -63,32 +61,62 @@ func TestReflector_watchHandler(t *testing.T) { t.Errorf("%v: expected %v, got %v", item.ID, e, a) } } + + // RV should stay 1 higher than the highest id we see. + if e, a := uint64(56), resumeRV; e != a { + t.Errorf("expected %v, got %v", e, a) + } } -func TestReflector_startWatch(t *testing.T) { - table := []struct{ resource, path string }{ - {"pods", "/api/v1beta1/pods/watch"}, - {"services", "/api/v1beta1/services/watch"}, - } - for _, testItem := range table { - got := make(chan struct{}) - srv := httptest.NewServer(http.HandlerFunc( - func(w http.ResponseWriter, req *http.Request) { - w.WriteHeader(http.StatusNotFound) - if req.URL.Path == testItem.path { - close(got) - return - } - t.Errorf("unexpected path %v", req.URL.Path) - })) - s := NewStore() - c := client.New(srv.URL, nil) - g := NewReflector(testItem.resource, c, &api.Pod{}, s) - _, err := g.startWatch() - // We're just checking that it watches the right path. - if err == nil { - t.Errorf("unexpected non-error") +func TestReflector_Run(t *testing.T) { + createdFakes := make(chan *watch.FakeWatcher) + + // Expect our starter to get called at the beginning of the watch with 0, and again with 3 when we + // inject an error at 2. + expectedRVs := []uint64{0, 3} + watchStarter := func(rv uint64) (watch.Interface, error) { + fw := watch.NewFake() + if e, a := expectedRVs[0], rv; e != a { + t.Errorf("Expected rv %v, but got %v", e, a) } - <-got + expectedRVs = expectedRVs[1:] + // channel is not buffered because the for loop below needs to block. But + // we don't want to block here, so report the new fake via a go routine. + go func() { createdFakes <- fw }() + return fw, nil + } + s := NewFIFO() + r := NewReflector(watchStarter, &api.Pod{}, s) + r.loopDelay = 0 + r.Run() + + ids := []string{"foo", "bar", "baz", "qux", "zoo"} + var fw *watch.FakeWatcher + for i, id := range ids { + if fw == nil { + fw = <-createdFakes + } + sendingRV := uint64(i + 1) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: id, ResourceVersion: sendingRV}}) + if sendingRV == 2 { + // Inject a failure. + fw.Stop() + fw = nil + } + } + + // Verify we received the right ids with the right resource versions. + for i, id := range ids { + pod := s.Pop().(*api.Pod) + if e, a := id, pod.ID; e != a { + t.Errorf("%v: Expected %v, got %v", i, e, a) + } + if e, a := uint64(i+1), pod.ResourceVersion; e != a { + t.Errorf("%v: Expected %v, got %v", i, e, a) + } + } + + if len(expectedRVs) != 0 { + t.Error("called watchStarter an unexpected number of times") } } From dddad888b5e01c9f397484477cef9a391ea4b681 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Sun, 3 Aug 2014 00:01:28 -0700 Subject: [PATCH 2/8] Begin scheduler plugin --- hack/build-go.sh | 2 +- pkg/scheduler/listers.go | 1 + pkg/scheduler/randomfit.go | 1 + plugin/cmd/scheduler/scheduler.go | 141 +++++++++++++++++++++++++ plugin/pkg/scheduler/scheduler.go | 80 ++++++++++++++ plugin/pkg/scheduler/scheduler_test.go | 110 +++++++++++++++++++ 6 files changed, 334 insertions(+), 1 deletion(-) create mode 100644 plugin/cmd/scheduler/scheduler.go create mode 100644 plugin/pkg/scheduler/scheduler.go create mode 100644 plugin/pkg/scheduler/scheduler_test.go diff --git a/hack/build-go.sh b/hack/build-go.sh index 6a5def2016e..b2044ae288f 100755 --- a/hack/build-go.sh +++ b/hack/build-go.sh @@ -33,7 +33,7 @@ cd "${KUBE_REPO_ROOT}" if [[ $# == 0 ]]; then # Update $@ with the default list of targets to build. - set -- cmd/proxy cmd/apiserver cmd/controller-manager cmd/kubelet cmd/kubecfg + set -- cmd/proxy cmd/apiserver cmd/controller-manager cmd/kubelet cmd/kubecfg plugin/cmd/scheduler fi binaries=() diff --git a/pkg/scheduler/listers.go b/pkg/scheduler/listers.go index 96b216d8b44..e48e9ad7c66 100644 --- a/pkg/scheduler/listers.go +++ b/pkg/scheduler/listers.go @@ -36,6 +36,7 @@ func (f FakeMinionLister) List() ([]string, error) { // PodLister interface represents anything that can list pods for a scheduler type PodLister interface { + // TODO: make this exactly the same as client's ListPods() method... ListPods(labels.Selector) ([]api.Pod, error) } diff --git a/pkg/scheduler/randomfit.go b/pkg/scheduler/randomfit.go index ce32d741c6d..92e66dd5cc7 100644 --- a/pkg/scheduler/randomfit.go +++ b/pkg/scheduler/randomfit.go @@ -57,6 +57,7 @@ func (s *RandomFitScheduler) Schedule(pod api.Pod, minionLister MinionLister) (s return "", err } machineToPods := map[string][]api.Pod{} + // TODO: perform more targeted query... pods, err := s.podLister.ListPods(labels.Everything()) if err != nil { return "", err diff --git a/plugin/cmd/scheduler/scheduler.go b/plugin/cmd/scheduler/scheduler.go new file mode 100644 index 00000000000..c2725ca539a --- /dev/null +++ b/plugin/cmd/scheduler/scheduler.go @@ -0,0 +1,141 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package main + +import ( + "flag" + "math/rand" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" +) + +var ( + master = flag.String("master", "", "The address of the Kubernetes API server") +) + +// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. +type storeToMinionLister struct { + s cache.Store +} + +func (s storeToMinionLister) List() (machines []string, err error) { + for _, m := range s.s.List() { + machines = append(machines, m.(*api.Minion).ID) + } + return machines, nil +} + +// storeToPodLister turns a store into a pod lister. The store must contain (only) pods. +type storeToPodLister struct { + s cache.Store +} + +func (s storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { + for _, m := range s.s.List() { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, *pod) + } + } + return pods, nil +} + +type binder struct { + kubeClient *client.Client +} + +// Bind just does a POST binding RPC. +func (b binder) Bind(binding *api.Binding) error { + return b.kubeClient.Post().Path("bindings").Body(binding).Do().Error() +} + +func main() { + flag.Parse() + util.InitLogs() + defer util.FlushLogs() + + verflag.PrintAndExitIfRequested() + + // This function is long because we inject all the dependencies into scheduler here. + + // TODO: security story for plugins! + kubeClient := client.New("http://"+*master, nil) + + // Watch and queue pods that need scheduling. + podQueue := cache.NewFIFO() + cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { + // This query will only find pods with no assigned host. + return kubeClient. + Get(). + Path("pods"). + Path("watch"). + SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). + UintParam("resourceVersion", resourceVersion). + Watch() + }, &api.Pod{}, podQueue).Run() + + // Watch and cache all running pods. Scheduler needs to find all pods + // so it knows where it's safe to place a pod. Cache this locally. + podCache := cache.NewStore() + cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { + // This query will only find pods that do have an assigned host. + return kubeClient. + Get(). + Path("pods"). + Path("watch"). + ParseSelectorParam("fields", "DesiredState.Host!="). + UintParam("resourceVersion", resourceVersion). + Watch() + }, &api.Pod{}, podCache).Run() + + // Watch minions. + // Minions may be listed frequently, so provide a local up-to-date cache. + minionCache := cache.NewStore() + cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { + // This query will only find pods that do have an assigned host. + return kubeClient. + Get(). + Path("minions"). + Path("watch"). + UintParam("resourceVersion", resourceVersion). + Watch() + }, &api.Minion{}, minionCache).Run() + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + algo := algorithm.NewRandomFitScheduler( + storeToPodLister{podCache}, r) + + s := scheduler.New(&scheduler.Config{ + MinionLister: storeToMinionLister{minionCache}, + Algorithm: algo, + NextPod: func() *api.Pod { return podQueue.Pop().(*api.Pod) }, + Binder: binder{kubeClient}, + }) + + s.Run() + + select {} +} diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go new file mode 100644 index 00000000000..0b5234ec700 --- /dev/null +++ b/plugin/pkg/scheduler/scheduler.go @@ -0,0 +1,80 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + // TODO: move everything from pkg/scheduler into this package. Remove references from registry. + "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" +) + +// Binder knows how to write a binding. +type Binder interface { + Bind(binding *api.Binding) error +} + +// Scheduler watches for new unscheduled pods. It attempts to find +// minions that they fit on and writes bindings back to the api server. +type Scheduler struct { + c *Config +} + +type Config struct { + MinionLister scheduler.MinionLister + Algorithm scheduler.Scheduler + Binder Binder + + // NextPod should be a function that blocks until the next pod + // is available. We don't use a channel for this, because scheduling + // a pod may take some amount of time and we don't want pods to get + // stale while they sit in a channel. + NextPod func() *api.Pod + + // Error is called if there is an error. It is passed the pod in + // question, and the error + Error func(*api.Pod, error) +} + +// New returns a new scheduler. +func New(c *Config) *Scheduler { + s := &Scheduler{ + c: c, + } + return s +} + +// Run begins watching and scheduling. +func (s *Scheduler) Run() { + go util.Forever(s.scheduleOne, 0) +} + +func (s *Scheduler) scheduleOne() { + pod := s.c.NextPod() + dest, err := s.c.Algorithm.Schedule(*pod, s.c.MinionLister) + if err != nil { + s.c.Error(pod, err) + return + } + b := &api.Binding{ + PodID: pod.ID, + Host: dest, + } + if err := s.c.Binder.Bind(b); err != nil { + s.c.Error(pod, err) + } +} diff --git a/plugin/pkg/scheduler/scheduler_test.go b/plugin/pkg/scheduler/scheduler_test.go new file mode 100644 index 00000000000..f6c597abb82 --- /dev/null +++ b/plugin/pkg/scheduler/scheduler_test.go @@ -0,0 +1,110 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package scheduler + +import ( + "errors" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" +) + +type fakeBinder struct { + b func(binding *api.Binding) error +} + +func (fb fakeBinder) Bind(binding *api.Binding) error { return fb.b(binding) } + +func podWithID(id string) *api.Pod { + return &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} +} + +type mockScheduler struct { + machine string + err error +} + +func (es mockScheduler) Schedule(pod api.Pod, ml scheduler.MinionLister) (string, error) { + return es.machine, es.err +} + +func TestScheduler(t *testing.T) { + + errS := errors.New("scheduler") + errB := errors.New("binder") + + table := []struct { + injectBindError error + sendPod *api.Pod + algo scheduler.Scheduler + expectErrorPod *api.Pod + expectError error + expectBind *api.Binding + }{ + { + sendPod: podWithID("foo"), + algo: mockScheduler{"machine1", nil}, + expectBind: &api.Binding{PodID: "foo", Host: "machine1"}, + }, { + sendPod: podWithID("foo"), + algo: mockScheduler{"machine1", errS}, + expectError: errS, + expectErrorPod: podWithID("foo"), + }, { + sendPod: podWithID("foo"), + algo: mockScheduler{"machine1", nil}, + expectBind: &api.Binding{PodID: "foo", Host: "machine1"}, + injectBindError: errB, + expectError: errB, + expectErrorPod: podWithID("foo"), + }, + } + + for i, item := range table { + var gotError error + var gotPod *api.Pod + var gotBinding *api.Binding + c := &Config{ + MinionLister: scheduler.FakeMinionLister{"machine1"}, + Algorithm: item.algo, + Binder: fakeBinder{func(b *api.Binding) error { + gotBinding = b + return item.injectBindError + }}, + Error: func(p *api.Pod, err error) { + gotPod = p + gotError = err + }, + NextPod: func() *api.Pod { + return item.sendPod + }, + } + s := New(c) + s.scheduleOne() + if e, a := item.expectErrorPod, gotPod; !reflect.DeepEqual(e, a) { + t.Errorf("%v: error pod: wanted %v, got %v", i, e, a) + } + if e, a := item.expectError, gotError; !reflect.DeepEqual(e, a) { + t.Errorf("%v: error: wanted %v, got %v", i, e, a) + } + if e, a := item.expectBind, gotBinding; !reflect.DeepEqual(e, a) { + t.Errorf("%v: error: wanted %v, got %v", i, e, a) + } + } +} From 4c4ca590503f39cdd6d09054c454b52f17441f8c Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 18 Aug 2014 14:47:20 -0700 Subject: [PATCH 3/8] Add poller to cache. --- pkg/client/cache/fifo.go | 35 ++++++--- pkg/client/cache/poller.go | 81 ++++++++++++++++++++ pkg/client/cache/poller_test.go | 119 +++++++++++++++++++++++++++++ pkg/client/cache/reflector.go | 32 +++----- pkg/client/cache/reflector_test.go | 10 +-- pkg/client/cache/store.go | 48 +++++++++--- pkg/client/cache/store_test.go | 13 +++- 7 files changed, 290 insertions(+), 48 deletions(-) create mode 100644 pkg/client/cache/poller.go create mode 100644 pkg/client/cache/poller_test.go diff --git a/pkg/client/cache/fifo.go b/pkg/client/cache/fifo.go index 8189305812a..07463e72c35 100644 --- a/pkg/client/cache/fifo.go +++ b/pkg/client/cache/fifo.go @@ -18,6 +18,8 @@ package cache import ( "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) // FIFO receives adds and updates from a Reflector, and puts them in a queue for @@ -33,30 +35,30 @@ type FIFO struct { } // Add inserts an item, and puts it in the queue. -func (f *FIFO) Add(ID string, obj interface{}) { +func (f *FIFO) Add(id string, obj interface{}) { f.lock.Lock() defer f.lock.Unlock() - f.items[ID] = obj - f.queue = append(f.queue, ID) + f.items[id] = obj + f.queue = append(f.queue, id) f.cond.Broadcast() } // Update updates an item, and adds it to the queue. -func (f *FIFO) Update(ID string, obj interface{}) { +func (f *FIFO) Update(id string, obj interface{}) { f.lock.Lock() defer f.lock.Unlock() - f.items[ID] = obj - f.queue = append(f.queue, ID) + f.items[id] = obj + f.queue = append(f.queue, id) f.cond.Broadcast() } // Delete removes an item. It doesn't add it to the queue, because // this implementation assumes the consumer only cares about the objects, // not the order in which they were created/added. -func (f *FIFO) Delete(ID string, obj interface{}) { +func (f *FIFO) Delete(id string) { f.lock.Lock() defer f.lock.Unlock() - delete(f.items, ID) + delete(f.items, id) } // List returns a list of all the items. @@ -70,11 +72,24 @@ func (f *FIFO) List() []interface{} { return list } +// Contains returns a util.StringSet containing all IDs of stored the items. +// This is a snapshot of a moment in time, and one should keep in mind that +// other go routines can add or remove items after you call this. +func (c *FIFO) Contains() util.StringSet { + c.lock.RLock() + defer c.lock.RUnlock() + set := util.StringSet{} + for id := range c.items { + set.Insert(id) + } + return set +} + // Get returns the requested item, or sets exists=false. -func (f *FIFO) Get(ID string) (item interface{}, exists bool) { +func (f *FIFO) Get(id string) (item interface{}, exists bool) { f.lock.RLock() defer f.lock.RUnlock() - item, exists = f.items[ID] + item, exists = f.items[id] return item, exists } diff --git a/pkg/client/cache/poller.go b/pkg/client/cache/poller.go new file mode 100644 index 00000000000..d2c379b7868 --- /dev/null +++ b/pkg/client/cache/poller.go @@ -0,0 +1,81 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/golang/glog" +) + +// Enumerator should be able to return the list of objects to be synced with +// one object at a time. +type Enumerator interface { + Len() int + Get(index int) (ID string, object interface{}) +} + +// GetFunc should return an enumerator that you wish the Poller to proccess. +type GetFunc func() (Enumerator, error) + +// Poller is like Reflector, but it periodically polls instead of watching. +// This is intended to be a workaround for api objects that don't yet support +// watching. +type Poller struct { + getFunc GetFunc + period time.Duration + store Store +} + +// NewPoller constructs a new poller. Note that polling probably doesn't make much +// sense to use along with the FIFO queue. The returned Poller will call getFunc and +// sync the objects in 'store' with the returned Enumerator, waiting 'period' between +// each call. It probably only makes sense to use a poller if you're treating the +// store as read-only. +func NewPoller(getFunc GetFunc, period time.Duration, store Store) *Poller { + return &Poller{ + getFunc: getFunc, + period: period, + store: store, + } +} + +// Run begins polling. It starts a goroutine and returns immediately. +func (p *Poller) Run() { + go util.Forever(func() { + e, err := p.getFunc() + if err != nil { + glog.Errorf("failed to list: %v", err) + return + } + p.sync(e) + }, p.period) +} + +func (p *Poller) sync(e Enumerator) { + current := p.store.Contains() + for i := 0; i < e.Len(); i++ { + id, object := e.Get(i) + p.store.Update(id, object) + current.Delete(id) + } + // Delete all the objects not found. + for id := range current { + p.store.Delete(id) + } +} diff --git a/pkg/client/cache/poller_test.go b/pkg/client/cache/poller_test.go new file mode 100644 index 00000000000..5ebfc21f260 --- /dev/null +++ b/pkg/client/cache/poller_test.go @@ -0,0 +1,119 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "errors" + "reflect" + "testing" + "time" +) + +type testPair struct { + id string + obj interface{} +} +type testEnumerator []testPair + +func (t testEnumerator) Len() int { return len(t) } +func (t testEnumerator) Get(i int) (string, interface{}) { + return t[i].id, t[i].obj +} + +func TestPoller_sync(t *testing.T) { + table := []struct { + // each step simulates the list that a getFunc would receive. + steps [][]testPair + }{ + { + steps: [][]testPair{ + { + {"foo", "foo1"}, + {"bar", "bar1"}, + {"baz", "baz1"}, + {"qux", "qux1"}, + }, { + {"foo", "foo2"}, + {"bar", "bar2"}, + {"qux", "qux2"}, + }, { + {"bar", "bar3"}, + {"baz", "baz2"}, + {"qux", "qux3"}, + }, { + {"qux", "qux4"}, + }, { + {"foo", "foo3"}, + }, + }, + }, + } + + for testCase, item := range table { + s := NewStore() + // This is a unit test for the sync function, hence the nil getFunc. + p := NewPoller(nil, 0, s) + for line, pairs := range item.steps { + p.sync(testEnumerator(pairs)) + + ids := s.Contains() + for _, pair := range pairs { + if !ids.Has(pair.id) { + t.Errorf("%v, %v: expected to find entry for %v, but did not.", testCase, line, pair.id) + continue + } + found, ok := s.Get(pair.id) + if !ok { + t.Errorf("%v, %v: unexpected absent entry for %v", testCase, line, pair.id) + continue + } + if e, a := pair.obj, found; !reflect.DeepEqual(e, a) { + t.Errorf("%v, %v: expected %v, got %v for %v", testCase, line, e, a, pair.id) + } + } + if e, a := len(pairs), len(ids); e != a { + t.Errorf("%v, %v: expected len %v, got %v", testCase, line, e, a) + } + } + } +} + +func TestPoller_Run(t *testing.T) { + s := NewStore() + const count = 10 + var called = 0 + done := make(chan struct{}) + NewPoller(func() (Enumerator, error) { + called++ + if called == count { + close(done) + } + // test both error and regular returns. + if called&1 == 0 { + return testEnumerator{}, nil + } + return nil, errors.New("transient error") + }, time.Millisecond, s).Run() + + // The test here is that we get called at least count times. + <-done + + // We never added anything, verify that. + if e, a := 0, len(s.Contains()); e != a { + t.Errorf("expected %v, got %v", e, a) + } +} diff --git a/pkg/client/cache/reflector.go b/pkg/client/cache/reflector.go index 14b97903b47..4df55155ab7 100644 --- a/pkg/client/cache/reflector.go +++ b/pkg/client/cache/reflector.go @@ -26,29 +26,17 @@ import ( "github.com/golang/glog" ) -// Store is a generic object storage interface. Reflector knows how to watch a server -// and update a store. A generic store is provided, which allows Reflector to be used -// as a local caching system, and an LRU store, which allows Reflector to work like a -// queue of items yet to be processed. -type Store interface { - Add(ID string, obj interface{}) - Update(ID string, obj interface{}) - Delete(ID string, obj interface{}) - List() []interface{} - Get(ID string) (item interface{}, exists bool) -} - // Reflector watches a specified resource and causes all changes to be reflected in the given store. type Reflector struct { // The type of object we expect to place in the store. expectedType reflect.Type // The destination to sync up with the watch source store Store - // watchCreater is called to initiate watches. + // watchFactory is called to initiate watches. watchFactory WatchFactory - // loopDelay controls timing between one watch ending and + // period controls timing between one watch ending and // the beginning of the next one. - loopDelay time.Duration + period time.Duration } // WatchFactory should begin a watch at the specified version. @@ -62,7 +50,7 @@ func NewReflector(watchFactory WatchFactory, expectedType interface{}, store Sto watchFactory: watchFactory, store: store, expectedType: reflect.TypeOf(expectedType), - loopDelay: time.Second, + period: time.Second, } return gc } @@ -78,7 +66,7 @@ func (gc *Reflector) Run() { return } gc.watchHandler(w, &resourceVersion) - }, gc.loopDelay) + }, gc.period) } // watchHandler watches w and keeps *resourceVersion up to date. @@ -104,13 +92,13 @@ func (gc *Reflector) watchHandler(w watch.Interface, resourceVersion *uint64) { case watch.Modified: gc.store.Update(jsonBase.ID(), event.Object) case watch.Deleted: - gc.store.Delete(jsonBase.ID(), event.Object) + // TODO: Will any consumers need access to the "last known + // state", which is passed in event.Object? If so, may need + // to change this. + gc.store.Delete(jsonBase.ID()) default: glog.Errorf("unable to understand watch event %#v", event) } - next := jsonBase.ResourceVersion() + 1 - if next > *resourceVersion { - *resourceVersion = next - } + *resourceVersion = jsonBase.ResourceVersion() + 1 } } diff --git a/pkg/client/cache/reflector_test.go b/pkg/client/cache/reflector_test.go index fa2b702a64a..89e5b7386da 100644 --- a/pkg/client/cache/reflector_test.go +++ b/pkg/client/cache/reflector_test.go @@ -30,10 +30,10 @@ func TestReflector_watchHandler(t *testing.T) { s.Add("foo", &api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) s.Add("bar", &api.Pod{JSONBase: api.JSONBase{ID: "bar"}}) go func() { - fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) - fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}}) fw.Add(&api.Service{JSONBase: api.JSONBase{ID: "rejected"}}) fw.Delete(&api.Pod{JSONBase: api.JSONBase{ID: "foo"}}) + fw.Modify(&api.Pod{JSONBase: api.JSONBase{ID: "bar", ResourceVersion: 55}}) + fw.Add(&api.Pod{JSONBase: api.JSONBase{ID: "baz", ResourceVersion: 32}}) fw.Stop() }() var resumeRV uint64 @@ -62,8 +62,8 @@ func TestReflector_watchHandler(t *testing.T) { } } - // RV should stay 1 higher than the highest id we see. - if e, a := uint64(56), resumeRV; e != a { + // RV should stay 1 higher than the last id we see. + if e, a := uint64(33), resumeRV; e != a { t.Errorf("expected %v, got %v", e, a) } } @@ -87,7 +87,7 @@ func TestReflector_Run(t *testing.T) { } s := NewFIFO() r := NewReflector(watchStarter, &api.Pod{}, s) - r.loopDelay = 0 + r.period = 0 r.Run() ids := []string{"foo", "bar", "baz", "qux", "zoo"} diff --git a/pkg/client/cache/store.go b/pkg/client/cache/store.go index 016aa18ce84..4b27aa29099 100644 --- a/pkg/client/cache/store.go +++ b/pkg/client/cache/store.go @@ -18,32 +18,47 @@ package cache import ( "sync" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" ) +// Store is a generic object storage interface. Reflector knows how to watch a server +// and update a store. A generic store is provided, which allows Reflector to be used +// as a local caching system, and an LRU store, which allows Reflector to work like a +// queue of items yet to be processed. +type Store interface { + Add(id string, obj interface{}) + Update(id string, obj interface{}) + Delete(id string) + List() []interface{} + Contains() util.StringSet + Get(id string) (item interface{}, exists bool) +} + type cache struct { lock sync.RWMutex items map[string]interface{} } // Add inserts an item into the cache. -func (c *cache) Add(ID string, obj interface{}) { +func (c *cache) Add(id string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() - c.items[ID] = obj + c.items[id] = obj } // Update sets an item in the cache to its updated state. -func (c *cache) Update(ID string, obj interface{}) { +func (c *cache) Update(id string, obj interface{}) { c.lock.Lock() defer c.lock.Unlock() - c.items[ID] = obj + c.items[id] = obj } // Delete removes an item from the cache. -func (c *cache) Delete(ID string, obj interface{}) { +func (c *cache) Delete(id string) { c.lock.Lock() defer c.lock.Unlock() - delete(c.items, ID) + delete(c.items, id) } // List returns a list of all the items. @@ -58,12 +73,25 @@ func (c *cache) List() []interface{} { return list } -// Get returns the requested item, or sets exists=false. -// Get is completely threadsafe as long as you treat all items as immutable. -func (c *cache) Get(ID string) (item interface{}, exists bool) { +// Contains returns a util.StringSet containing all IDs of stored the items. +// This is a snapshot of a moment in time, and one should keep in mind that +// other go routines can add or remove items after you call this. +func (c *cache) Contains() util.StringSet { c.lock.RLock() defer c.lock.RUnlock() - item, exists = c.items[ID] + set := util.StringSet{} + for id := range c.items { + set.Insert(id) + } + return set +} + +// Get returns the requested item, or sets exists=false. +// Get is completely threadsafe as long as you treat all items as immutable. +func (c *cache) Get(id string) (item interface{}, exists bool) { + c.lock.RLock() + defer c.lock.RUnlock() + item, exists = c.items[id] return item, exists } diff --git a/pkg/client/cache/store_test.go b/pkg/client/cache/store_test.go index 9154488c31a..4220d284f4a 100644 --- a/pkg/client/cache/store_test.go +++ b/pkg/client/cache/store_test.go @@ -40,10 +40,12 @@ func doTestStore(t *testing.T, store Store) { t.Errorf("expected %v, got %v", e, a) } } - store.Delete("foo", "qux") + store.Delete("foo") if _, ok := store.Get("foo"); ok { t.Errorf("found deleted item??") } + + // Test List store.Add("a", "b") store.Add("c", "d") store.Add("e", "e") @@ -57,6 +59,15 @@ func doTestStore(t *testing.T, store Store) { if len(found) != 3 { t.Errorf("extra items") } + + // Check that ID list is correct. + ids := store.Contains() + if !ids.HasAll("a", "c", "e") { + t.Errorf("missing items") + } + if len(ids) != 3 { + t.Errorf("extra items") + } } func TestCache(t *testing.T) { From b2349bc66ad55c3034eddee25127e8ee02514c1f Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 18 Aug 2014 15:02:42 -0700 Subject: [PATCH 4/8] Change scheduler to poll for minions --- plugin/cmd/scheduler/scheduler.go | 87 ++++++++++++++++++++++--------- plugin/pkg/scheduler/scheduler.go | 16 +++--- 2 files changed, 71 insertions(+), 32 deletions(-) diff --git a/plugin/cmd/scheduler/scheduler.go b/plugin/cmd/scheduler/scheduler.go index c2725ca539a..f7727693f87 100644 --- a/plugin/cmd/scheduler/scheduler.go +++ b/plugin/cmd/scheduler/scheduler.go @@ -30,6 +30,8 @@ import ( verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" + + "github.com/golang/glog" ) var ( @@ -38,11 +40,11 @@ var ( // storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. type storeToMinionLister struct { - s cache.Store + cache.Store } -func (s storeToMinionLister) List() (machines []string, err error) { - for _, m := range s.s.List() { +func (s *storeToMinionLister) List() (machines []string, err error) { + for _, m := range s.Store.List() { machines = append(machines, m.(*api.Minion).ID) } return machines, nil @@ -50,11 +52,11 @@ func (s storeToMinionLister) List() (machines []string, err error) { // storeToPodLister turns a store into a pod lister. The store must contain (only) pods. type storeToPodLister struct { - s cache.Store + cache.Store } -func (s storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { - for _, m := range s.s.List() { +func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { + for _, m := range s.List() { pod := m.(*api.Pod) if selector.Matches(labels.Set(pod.Labels)) { pods = append(pods, *pod) @@ -63,13 +65,31 @@ func (s storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, er return pods, nil } +// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList +type minionEnumerator struct { + *api.MinionList +} + +// Returns the number of items in the pod list. +func (me *minionEnumerator) Len() int { + if me.MinionList == nil { + return 0 + } + return len(me.Items) +} + +// Returns the item (and ID) with the particular index. +func (me *minionEnumerator) Get(index int) (string, interface{}) { + return me.Items[index].ID, &me.Items[index] +} + type binder struct { - kubeClient *client.Client + *client.Client } // Bind just does a POST binding RPC. -func (b binder) Bind(binding *api.Binding) error { - return b.kubeClient.Post().Path("bindings").Body(binding).Do().Error() +func (b *binder) Bind(binding *api.Binding) error { + return b.Post().Path("bindings").Body(binding).Do().Error() } func main() { @@ -90,8 +110,8 @@ func main() { // This query will only find pods with no assigned host. return kubeClient. Get(). - Path("pods"). Path("watch"). + Path("pods"). SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). UintParam("resourceVersion", resourceVersion). Watch() @@ -104,8 +124,8 @@ func main() { // This query will only find pods that do have an assigned host. return kubeClient. Get(). - Path("pods"). Path("watch"). + Path("pods"). ParseSelectorParam("fields", "DesiredState.Host!="). UintParam("resourceVersion", resourceVersion). Watch() @@ -114,25 +134,44 @@ func main() { // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. minionCache := cache.NewStore() - cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { - // This query will only find pods that do have an assigned host. - return kubeClient. - Get(). - Path("minions"). - Path("watch"). - UintParam("resourceVersion", resourceVersion). - Watch() - }, &api.Minion{}, minionCache).Run() + if false { + // Disable this code until minions support watches. + cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { + // This query will only find pods that do have an assigned host. + return kubeClient. + Get(). + Path("watch"). + Path("minions"). + UintParam("resourceVersion", resourceVersion). + Watch() + }, &api.Minion{}, minionCache).Run() + } else { + cache.NewPoller(func() (cache.Enumerator, error) { + // This query will only find pods that do have an assigned host. + list := &api.MinionList{} + err := kubeClient.Get().Path("minions").Do().Into(list) + if err != nil { + return nil, err + } + return &minionEnumerator{list}, nil + }, 10*time.Second, minionCache).Run() + } r := rand.New(rand.NewSource(time.Now().UnixNano())) algo := algorithm.NewRandomFitScheduler( - storeToPodLister{podCache}, r) + &storeToPodLister{podCache}, r) s := scheduler.New(&scheduler.Config{ - MinionLister: storeToMinionLister{minionCache}, + MinionLister: &storeToMinionLister{minionCache}, Algorithm: algo, - NextPod: func() *api.Pod { return podQueue.Pop().(*api.Pod) }, - Binder: binder{kubeClient}, + Binder: &binder{kubeClient}, + NextPod: func() *api.Pod { + return podQueue.Pop().(*api.Pod) + }, + Error: func(pod *api.Pod, err error) { + glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) + podQueue.Add(pod.ID, pod) + }, }) s.Run() diff --git a/plugin/pkg/scheduler/scheduler.go b/plugin/pkg/scheduler/scheduler.go index 0b5234ec700..2c6f81483e2 100644 --- a/plugin/pkg/scheduler/scheduler.go +++ b/plugin/pkg/scheduler/scheduler.go @@ -31,7 +31,7 @@ type Binder interface { // Scheduler watches for new unscheduled pods. It attempts to find // minions that they fit on and writes bindings back to the api server. type Scheduler struct { - c *Config + config *Config } type Config struct { @@ -53,28 +53,28 @@ type Config struct { // New returns a new scheduler. func New(c *Config) *Scheduler { s := &Scheduler{ - c: c, + config: c, } return s } -// Run begins watching and scheduling. +// Run begins watching and scheduling. It starts a goroutine and returns immediately. func (s *Scheduler) Run() { go util.Forever(s.scheduleOne, 0) } func (s *Scheduler) scheduleOne() { - pod := s.c.NextPod() - dest, err := s.c.Algorithm.Schedule(*pod, s.c.MinionLister) + pod := s.config.NextPod() + dest, err := s.config.Algorithm.Schedule(*pod, s.config.MinionLister) if err != nil { - s.c.Error(pod, err) + s.config.Error(pod, err) return } b := &api.Binding{ PodID: pod.ID, Host: dest, } - if err := s.c.Binder.Bind(b); err != nil { - s.c.Error(pod, err) + if err := s.config.Binder.Bind(b); err != nil { + s.config.Error(pod, err) } } From 79f60da6c47ec70497af43fbd97dd35edb55eb87 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Mon, 18 Aug 2014 16:37:51 -0700 Subject: [PATCH 5/8] add salt files to make scheduler run --- cluster/saltbase/salt/scheduler/default | 6 + cluster/saltbase/salt/scheduler/init.sls | 91 +++++++++++++ cluster/saltbase/salt/scheduler/initd | 120 ++++++++++++++++++ .../saltbase/salt/scheduler/scheduler.service | 11 ++ cluster/saltbase/salt/top.sls | 1 + release/master-release-install.sh | 3 + 6 files changed, 232 insertions(+) create mode 100644 cluster/saltbase/salt/scheduler/default create mode 100644 cluster/saltbase/salt/scheduler/init.sls create mode 100644 cluster/saltbase/salt/scheduler/initd create mode 100644 cluster/saltbase/salt/scheduler/scheduler.service diff --git a/cluster/saltbase/salt/scheduler/default b/cluster/saltbase/salt/scheduler/default new file mode 100644 index 00000000000..e4a89682e62 --- /dev/null +++ b/cluster/saltbase/salt/scheduler/default @@ -0,0 +1,6 @@ +{% set daemon_args = "$DAEMON_ARGS" %} +{% if grains['os_family'] == 'RedHat' %} + {% set daemon_args = "" %} +{% endif %} +{% set master="-master=127.0.0.1:8080" %} +DAEMON_ARGS="{{daemon_args}} {{master}}" diff --git a/cluster/saltbase/salt/scheduler/init.sls b/cluster/saltbase/salt/scheduler/init.sls new file mode 100644 index 00000000000..67835e6bdbf --- /dev/null +++ b/cluster/saltbase/salt/scheduler/init.sls @@ -0,0 +1,91 @@ +{% set root = '/var/src/scheduler' %} +{% set package = 'github.com/GoogleCloudPlatform/kubernetes' %} +{% set package_dir = root + '/src/' + package %} +{% if grains['os_family'] == 'RedHat' %} +{% set environment_file = '/etc/sysconfig/scheduler' %} +{% else %} +{% set environment_file = '/etc/default/scheduler' %} +{% endif %} + +{{ package_dir }}: + file.recurse: + - source: salt://scheduler/go + - user: root + {% if grains['os_family'] == 'RedHat' %} + - group: root + {% else %} + - group: staff + {% endif %} + - dir_mode: 775 + - file_mode: 664 + - makedirs: True + - recurse: + - user + - group + - mode + +{{ environment_file }}: + file.managed: + - source: salt://scheduler/default + - template: jinja + - user: root + - group: root + - mode: 644 + +scheduler-build: + cmd.run: + - cwd: {{ root }} + - names: + - go build {{ package }}/plugin/cmd/scheduler + - env: + - PATH: {{ grains['path'] }}:/usr/local/bin + - GOPATH: {{ root }}:{{ package_dir }}/Godeps/_workspace + - require: + - file: {{ package_dir }} + +/usr/local/bin/scheduler: + file.symlink: + - target: {{ root }}/scheduler + - watch: + - cmd: scheduler-build + +{% if grains['os_family'] == 'RedHat' %} + +/usr/lib/systemd/system/scheduler.service: + file.managed: + - source: salt://scheduler/scheduler.service + - user: root + - group: root + +{% else %} + +/etc/init.d/scheduler: + file.managed: + - source: salt://scheduler/initd + - user: root + - group: root + - mode: 755 + +{% endif %} + +scheduler: + group.present: + - system: True + user.present: + - system: True + - gid_from_name: True + - shell: /sbin/nologin + - home: /var/scheduler + - require: + - group: scheduler + service.running: + - enable: True + - watch: + - cmd: scheduler-build + - file: /usr/local/bin/scheduler + - file: {{ environment_file }} +{% if grains['os_family'] != 'RedHat' %} + - file: /etc/init.d/scheduler +{% endif %} + + diff --git a/cluster/saltbase/salt/scheduler/initd b/cluster/saltbase/salt/scheduler/initd new file mode 100644 index 00000000000..ddca3531368 --- /dev/null +++ b/cluster/saltbase/salt/scheduler/initd @@ -0,0 +1,120 @@ +#!/bin/bash +# +### BEGIN INIT INFO +# Provides: scheduler +# Required-Start: $local_fs $network $syslog +# Required-Stop: +# Default-Start: 2 3 4 5 +# Default-Stop: 0 1 6 +# Short-Description: The Kubernetes scheduler plugin +# Description: +# The Kubernetes scheduler plugin is responsible for scheduling pods on +# available minions. +### END INIT INFO + + +# PATH should only include /usr/* if it runs after the mountnfs.sh script +PATH=/sbin:/usr/sbin:/bin:/usr/bin +DESC="The Kubernetes scheduler plugin" +NAME=scheduler +DAEMON=/usr/local/bin/scheduler +DAEMON_ARGS=" --master=127.0.0.1:8080" +DAEMON_LOG_FILE=/var/log/$NAME.log +PIDFILE=/var/run/$NAME.pid +SCRIPTNAME=/etc/init.d/$NAME +DAEMON_USER=scheduler + +# Exit if the package is not installed +[ -x "$DAEMON" ] || exit 0 + +# Read configuration variable file if it is present +[ -r /etc/default/$NAME ] && . /etc/default/$NAME + +# Define LSB log_* functions. +# Depend on lsb-base (>= 3.2-14) to ensure that this file is present +# and status_of_proc is working. +. /lib/lsb/init-functions + +# +# Function that starts the daemon/service +# +do_start() +{ + # Return + # 0 if daemon has been started + # 1 if daemon was already running + # 2 if daemon could not be started + start-stop-daemon --start --quiet --background --no-close \ + --make-pidfile --pidfile $PIDFILE \ + --exec $DAEMON -c $DAEMON_USER --test > /dev/null \ + || return 1 + start-stop-daemon --start --quiet --background --no-close \ + --make-pidfile --pidfile $PIDFILE \ + --exec $DAEMON -c $DAEMON_USER -- \ + $DAEMON_ARGS >> $DAEMON_LOG_FILE 2>&1 \ + || return 2 +} + +# +# Function that stops the daemon/service +# +do_stop() +{ + # Return + # 0 if daemon has been stopped + # 1 if daemon was already stopped + # 2 if daemon could not be stopped + # other if a failure occurred + start-stop-daemon --stop --quiet --retry=TERM/30/KILL/5 --pidfile $PIDFILE --exec $DAEMON + RETVAL="$?" + [ "$RETVAL" = 2 ] && return 2 + # Many daemons don't delete their pidfiles when they exit. + rm -f $PIDFILE + return "$RETVAL" +} + + +case "$1" in + start) + log_daemon_msg "Starting $DESC" "$NAME" + do_start + case "$?" in + 0|1) log_end_msg 0 || exit 0 ;; + 2) verblog_end_msg 1 || exit 1 ;; + esac + ;; + stop) + log_daemon_msg "Stopping $DESC" "$NAME" + do_stop + case "$?" in + 0|1) log_end_msg 0 ;; + 2) exit 1 ;; + esac + ;; + status) + status_of_proc -p $PIDFILE "$DAEMON" "$NAME" && exit 0 || exit $? + ;; + + restart|force-reload) + log_daemon_msg "Restarting $DESC" "$NAME" + do_stop + case "$?" in + 0|1) + do_start + case "$?" in + 0) log_end_msg 0 ;; + 1) log_end_msg 1 ;; # Old process is still running + *) log_end_msg 1 ;; # Failed to start + esac + ;; + *) + # Failed to stop + log_end_msg 1 + ;; + esac + ;; + *) + echo "Usage: $SCRIPTNAME {start|stop|status|restart|force-reload}" >&2 + exit 3 + ;; +esac diff --git a/cluster/saltbase/salt/scheduler/scheduler.service b/cluster/saltbase/salt/scheduler/scheduler.service new file mode 100644 index 00000000000..ffd8562aab5 --- /dev/null +++ b/cluster/saltbase/salt/scheduler/scheduler.service @@ -0,0 +1,11 @@ +[Unit] +Description=Kubernetes Scheduler Plugin +Documentation=https://github.com/GoogleCloudPlatform/kubernetes + +[Service] +Type=simple +EnvironmentFile=-/etc/sysconfig/scheduler +ExecStart=/usr/local/bin/scheduler "$DAEMON_ARGS" + +[Install] +WantedBy=multi-user.target diff --git a/cluster/saltbase/salt/top.sls b/cluster/saltbase/salt/top.sls index f6d51278e4c..2c93627410a 100644 --- a/cluster/saltbase/salt/top.sls +++ b/cluster/saltbase/salt/top.sls @@ -17,4 +17,5 @@ base: - etcd - apiserver - controller-manager + - scheduler - nginx diff --git a/release/master-release-install.sh b/release/master-release-install.sh index be87e17a838..01a0d8812b5 100755 --- a/release/master-release-install.sh +++ b/release/master-release-install.sh @@ -37,6 +37,9 @@ cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/kube-proxy/go mkdir -p /srv/salt/controller-manager/go cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/controller-manager/go +mkdir -p /srv/salt/scheduler/go +cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/scheduler/go + mkdir -p /srv/salt/kubelet/go cp -R --preserve=mode $RELEASE_BASE/src/go/* /srv/salt/kubelet/go From 8a9eaf911fc364caddddfb620622eb5d1afaf527 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 20 Aug 2014 14:34:55 -0700 Subject: [PATCH 6/8] For testability & reuse, move scheduler setup into its own package. --- pkg/util/fake_handler.go | 13 +- plugin/cmd/scheduler/scheduler.go | 139 +----------- plugin/pkg/scheduler/factory/factory.go | 177 +++++++++++++++ plugin/pkg/scheduler/factory/factory_test.go | 224 +++++++++++++++++++ 4 files changed, 416 insertions(+), 137 deletions(-) create mode 100644 plugin/pkg/scheduler/factory/factory.go create mode 100644 plugin/pkg/scheduler/factory/factory_test.go diff --git a/pkg/util/fake_handler.go b/pkg/util/fake_handler.go index 624e226b92a..97f4ea3e3fe 100644 --- a/pkg/util/fake_handler.go +++ b/pkg/util/fake_handler.go @@ -19,6 +19,8 @@ package util import ( "io/ioutil" "net/http" + "net/url" + "reflect" ) // TestInterface is a simple interface providing Errorf, to make injection for @@ -57,8 +59,15 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ // ValidateRequest verifies that FakeHandler received a request with expected path, method, and body. func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMethod string, body *string) { - if f.RequestReceived.URL.Path != expectedPath { - t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectedPath) + expectURL, err := url.Parse(expectedPath) + if err != nil { + t.Errorf("Couldn't parse %v as a URL.", expectedPath) + } + if f.RequestReceived.URL.Path != expectURL.Path { + t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectURL.Path) + } + if e, a := expectURL.Query(), f.RequestReceived.URL.Query(); !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected query for request %#v, received: %q, expected: %q", f.RequestReceived, a, e) } if f.RequestReceived.Method != expectedMethod { t.Errorf("Unexpected method: %q, expected: %q", f.RequestReceived.Method, expectedMethod) diff --git a/plugin/cmd/scheduler/scheduler.go b/plugin/cmd/scheduler/scheduler.go index f7727693f87..05242cc8ddc 100644 --- a/plugin/cmd/scheduler/scheduler.go +++ b/plugin/cmd/scheduler/scheduler.go @@ -18,80 +18,18 @@ package main import ( "flag" - "math/rand" - "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" - - "github.com/golang/glog" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" ) var ( master = flag.String("master", "", "The address of the Kubernetes API server") ) -// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. -type storeToMinionLister struct { - cache.Store -} - -func (s *storeToMinionLister) List() (machines []string, err error) { - for _, m := range s.Store.List() { - machines = append(machines, m.(*api.Minion).ID) - } - return machines, nil -} - -// storeToPodLister turns a store into a pod lister. The store must contain (only) pods. -type storeToPodLister struct { - cache.Store -} - -func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { - for _, m := range s.List() { - pod := m.(*api.Pod) - if selector.Matches(labels.Set(pod.Labels)) { - pods = append(pods, *pod) - } - } - return pods, nil -} - -// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList -type minionEnumerator struct { - *api.MinionList -} - -// Returns the number of items in the pod list. -func (me *minionEnumerator) Len() int { - if me.MinionList == nil { - return 0 - } - return len(me.Items) -} - -// Returns the item (and ID) with the particular index. -func (me *minionEnumerator) Get(index int) (string, interface{}) { - return me.Items[index].ID, &me.Items[index] -} - -type binder struct { - *client.Client -} - -// Bind just does a POST binding RPC. -func (b *binder) Bind(binding *api.Binding) error { - return b.Post().Path("bindings").Body(binding).Do().Error() -} - func main() { flag.Parse() util.InitLogs() @@ -99,81 +37,12 @@ func main() { verflag.PrintAndExitIfRequested() - // This function is long because we inject all the dependencies into scheduler here. - // TODO: security story for plugins! kubeClient := client.New("http://"+*master, nil) - // Watch and queue pods that need scheduling. - podQueue := cache.NewFIFO() - cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { - // This query will only find pods with no assigned host. - return kubeClient. - Get(). - Path("watch"). - Path("pods"). - SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). - UintParam("resourceVersion", resourceVersion). - Watch() - }, &api.Pod{}, podQueue).Run() - - // Watch and cache all running pods. Scheduler needs to find all pods - // so it knows where it's safe to place a pod. Cache this locally. - podCache := cache.NewStore() - cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { - // This query will only find pods that do have an assigned host. - return kubeClient. - Get(). - Path("watch"). - Path("pods"). - ParseSelectorParam("fields", "DesiredState.Host!="). - UintParam("resourceVersion", resourceVersion). - Watch() - }, &api.Pod{}, podCache).Run() - - // Watch minions. - // Minions may be listed frequently, so provide a local up-to-date cache. - minionCache := cache.NewStore() - if false { - // Disable this code until minions support watches. - cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { - // This query will only find pods that do have an assigned host. - return kubeClient. - Get(). - Path("watch"). - Path("minions"). - UintParam("resourceVersion", resourceVersion). - Watch() - }, &api.Minion{}, minionCache).Run() - } else { - cache.NewPoller(func() (cache.Enumerator, error) { - // This query will only find pods that do have an assigned host. - list := &api.MinionList{} - err := kubeClient.Get().Path("minions").Do().Into(list) - if err != nil { - return nil, err - } - return &minionEnumerator{list}, nil - }, 10*time.Second, minionCache).Run() - } - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := algorithm.NewRandomFitScheduler( - &storeToPodLister{podCache}, r) - - s := scheduler.New(&scheduler.Config{ - MinionLister: &storeToMinionLister{minionCache}, - Algorithm: algo, - Binder: &binder{kubeClient}, - NextPod: func() *api.Pod { - return podQueue.Pop().(*api.Pod) - }, - Error: func(pod *api.Pod, err error) { - glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) - podQueue.Add(pod.ID, pod) - }, - }) - + configFactory := &factory.ConfigFactory{Client: kubeClient} + config := configFactory.Create() + s := scheduler.New(config) s.Run() select {} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go new file mode 100644 index 00000000000..cbea5893e92 --- /dev/null +++ b/plugin/pkg/scheduler/factory/factory.go @@ -0,0 +1,177 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package factory can set up a scheduler. This code is here instead of +// plugin/cmd/scheduler for both testability and reuse. +package factory + +import ( + "math/rand" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" + + "github.com/golang/glog" +) + +// ConfigFactory knows how to fill out a scheduler config with its support functions. +type ConfigFactory struct { + Client *client.Client +} + +// Create creates a scheduler and all support functions. +func (factory *ConfigFactory) Create() *scheduler.Config { + // Watch and queue pods that need scheduling. + podQueue := cache.NewFIFO() + cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run() + + // Watch and cache all running pods. Scheduler needs to find all pods + // so it knows where it's safe to place a pod. Cache this locally. + podCache := cache.NewStore() + cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run() + + // Watch minions. + // Minions may be listed frequently, so provide a local up-to-date cache. + minionCache := cache.NewStore() + if false { + // Disable this code until minions support watches. + cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run() + } else { + cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run() + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + algo := algorithm.NewRandomFitScheduler( + &storeToPodLister{podCache}, r) + + return &scheduler.Config{ + MinionLister: &storeToMinionLister{minionCache}, + Algorithm: algo, + Binder: &binder{factory.Client}, + NextPod: func() *api.Pod { + return podQueue.Pop().(*api.Pod) + }, + Error: factory.defaultErrorFunc, + } +} + +// createUnassignedPodWatch starts a watch that finds all pods that need to be +// scheduled. +func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) { + return factory.Client. + Get(). + Path("watch"). + Path("pods"). + SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// createUnassignedPodWatch starts a watch that finds all pods that are +// already scheduled. +func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) { + return factory.Client. + Get(). + Path("watch"). + Path("pods"). + ParseSelectorParam("fields", "DesiredState.Host!="). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// createMinionWatch starts a watch that gets all changes to minions. +func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) { + return factory.Client. + Get(). + Path("watch"). + Path("minions"). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// pollMinions lists all minions and returns an enumerator for cache.Poller. +func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { + list := &api.MinionList{} + err := factory.Client.Get().Path("minions").Do().Into(list) + if err != nil { + return nil, err + } + return &minionEnumerator{list}, nil +} + +func (factory *ConfigFactory) defaultErrorFunc(pod *api.Pod, err error) { + glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) +} + +// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. +type storeToMinionLister struct { + cache.Store +} + +func (s *storeToMinionLister) List() (machines []string, err error) { + for _, m := range s.Store.List() { + machines = append(machines, m.(*api.Minion).ID) + } + return machines, nil +} + +// storeToPodLister turns a store into a pod lister. The store must contain (only) pods. +type storeToPodLister struct { + cache.Store +} + +func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { + for _, m := range s.List() { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, *pod) + } + } + return pods, nil +} + +// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList +type minionEnumerator struct { + *api.MinionList +} + +// Returns the number of items in the pod list. +func (me *minionEnumerator) Len() int { + if me.MinionList == nil { + return 0 + } + return len(me.Items) +} + +// Returns the item (and ID) with the particular index. +func (me *minionEnumerator) Get(index int) (string, interface{}) { + return me.Items[index].ID, &me.Items[index] +} + +type binder struct { + *client.Client +} + +// Bind just does a POST binding RPC. +func (b *binder) Bind(binding *api.Binding) error { + return b.Post().Path("bindings").Body(binding).Do().Error() +} diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go new file mode 100644 index 00000000000..c542423526f --- /dev/null +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "net/http/httptest" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestCreate(t *testing.T) { + handler := util.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + factory := ConfigFactory{client.New(server.URL, nil)} + factory.Create() +} + +func TestCreateWatches(t *testing.T) { + factory := ConfigFactory{nil} + table := []struct { + rv uint64 + location string + watchFactory func(rv uint64) (watch.Interface, error) + }{ + // Minion watch + { + rv: 0, + location: "/api/v1beta1/watch/minions?resourceVersion=0", + watchFactory: factory.createMinionWatch, + }, { + rv: 42, + location: "/api/v1beta1/watch/minions?resourceVersion=42", + watchFactory: factory.createMinionWatch, + }, + // Assigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0", + watchFactory: factory.createAssignedPodWatch, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", + watchFactory: factory.createAssignedPodWatch, + }, + // Unassigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0", + watchFactory: factory.createUnassignedPodWatch, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", + watchFactory: factory.createUnassignedPodWatch, + }, + } + + for _, item := range table { + handler := util.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + factory.Client = client.New(server.URL, nil) + // This test merely tests that the correct request is made. + item.watchFactory(item.rv) + handler.ValidateRequest(t, item.location, "GET", nil) + } +} + +func TestPollMinions(t *testing.T) { + table := []struct { + minions []api.Minion + }{ + { + minions: []api.Minion{ + {JSONBase: api.JSONBase{ID: "foo"}}, + {JSONBase: api.JSONBase{ID: "bar"}}, + }, + }, + } + + for _, item := range table { + ml := &api.MinionList{Items: item.minions} + handler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: api.EncodeOrDie(ml), + T: t, + } + server := httptest.NewServer(&handler) + cf := ConfigFactory{client.New(server.URL, nil)} + + ce, err := cf.pollMinions() + if err != nil { + t.Errorf("Unexpected error: %v", err) + continue + } + handler.ValidateRequest(t, "/api/v1beta1/minions", "GET", nil) + + if e, a := len(item.minions), ce.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestStoreToMinionLister(t *testing.T) { + store := cache.NewStore() + ids := util.NewStringSet("foo", "bar", "baz") + for id := range ids { + store.Add(id, &api.Minion{JSONBase: api.JSONBase{ID: id}}) + } + sml := storeToMinionLister{store} + + got, err := sml.List() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !ids.HasAll(got...) || len(got) != len(ids) { + t.Errorf("Expected %v, got %v", ids, got) + } +} + +func TestStoreToPodLister(t *testing.T) { + store := cache.NewStore() + ids := []string{"foo", "bar", "baz"} + for _, id := range ids { + store.Add(id, &api.Pod{ + JSONBase: api.JSONBase{ID: id}, + Labels: map[string]string{"name": id}, + }) + } + spl := storeToPodLister{store} + + for _, id := range ids { + got, err := spl.ListPods(labels.Set{"name": id}.AsSelector()) + if err != nil { + t.Errorf("Unexpected error: %v", err) + continue + } + if e, a := 1, len(got); e != a { + t.Errorf("Expected %v, got %v", e, a) + continue + } + if e, a := id, got[0].ID; e != a { + t.Errorf("Expected %v, got %v", e, a) + continue + } + } +} + +func TestMinionEnumerator(t *testing.T) { + testList := &api.MinionList{ + Items: []api.Minion{ + {JSONBase: api.JSONBase{ID: "foo"}}, + {JSONBase: api.JSONBase{ID: "bar"}}, + {JSONBase: api.JSONBase{ID: "baz"}}, + }, + } + me := minionEnumerator{testList} + + if e, a := 3, me.Len(); e != a { + t.Fatalf("expected %v, got %v", e, a) + } + for i := range testList.Items { + gotID, gotObj := me.Get(i) + if e, a := testList.Items[i].ID, gotID; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := &testList.Items[i], gotObj; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %v#", e, a) + } + } +} + +func TestBind(t *testing.T) { + table := []struct { + binding *api.Binding + }{ + {binding: &api.Binding{PodID: "foo", Host: "foohost.kubernetes.mydomain.com"}}, + } + + for _, item := range table { + handler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + b := binder{client.New(server.URL, nil)} + + err := b.Bind(item.binding) + if err != nil { + t.Errorf("Unexpected error: %v", err) + continue + } + expectedBody := api.EncodeOrDie(item.binding) + handler.ValidateRequest(t, "/api/v1beta1/bindings", "POST", &expectedBody) + } +} From 3d47c8bad13504dbc80ffbc751937796b5d3f087 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 20 Aug 2014 15:30:09 -0700 Subject: [PATCH 7/8] Fix up client tests now that we check the query in FakeHandler --- pkg/client/client_test.go | 6 +++++- pkg/client/request_test.go | 22 +++++----------------- 2 files changed, 10 insertions(+), 18 deletions(-) diff --git a/pkg/client/client_test.go b/pkg/client/client_test.go index 1ddea772b1e..f1a4a6bd24a 100644 --- a/pkg/client/client_test.go +++ b/pkg/client/client_test.go @@ -343,13 +343,17 @@ func (c *testClient) Validate(t *testing.T, received interface{}, err error) { } requestBody := body(c.Request.Body, c.Request.RawBody) + actualQuery := c.handler.RequestReceived.URL.Query() + // We check the query manually, so blank it out so that FakeHandler.ValidateRequest + // won't check it. + c.handler.RequestReceived.URL.RawQuery = "" c.handler.ValidateRequest(t, makeURL(c.Request.Path), c.Request.Method, requestBody) for key, values := range c.Request.Query { validator, ok := c.QueryValidator[key] if !ok { validator = func(a, b string) bool { return a == b } } - observed := c.handler.RequestReceived.URL.Query().Get(key) + observed := actualQuery.Get(key) if !validator(values[0], observed) { t.Errorf("Unexpected query arg for key: %s. Expected %s, Received %s", key, values[0], observed) } diff --git a/pkg/client/request_test.go b/pkg/client/request_test.go index 5e5ddc4cc55..d27cde53545 100644 --- a/pkg/client/request_test.go +++ b/pkg/client/request_test.go @@ -62,10 +62,7 @@ func TestDoRequestNewWay(t *testing.T) { } else if !reflect.DeepEqual(obj, expectedObj) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } - fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &reqBody) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" { - t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) - } + fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "POST", &reqBody) if fakeHandler.RequestReceived.Header["Authorization"] == nil { t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived) } @@ -88,7 +85,7 @@ func TestDoRequestNewWayReader(t *testing.T) { Path("foo/bar"). Path("baz"). SelectorParam("labels", labels.Set{"name": "foo"}.AsSelector()). - Sync(false). + Sync(true). Timeout(time.Second). Body(bytes.NewBuffer(reqBodyExpected)). Do().Get() @@ -102,10 +99,7 @@ func TestDoRequestNewWayReader(t *testing.T) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } tmpStr := string(reqBodyExpected) - fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" { - t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) - } + fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo&sync=true&timeout=1s", "POST", &tmpStr) if fakeHandler.RequestReceived.Header["Authorization"] == nil { t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived) } @@ -141,10 +135,7 @@ func TestDoRequestNewWayObj(t *testing.T) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } tmpStr := string(reqBodyExpected) - fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" { - t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) - } + fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "POST", &tmpStr) if fakeHandler.RequestReceived.Header["Authorization"] == nil { t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived) } @@ -194,10 +185,7 @@ func TestDoRequestNewWayFile(t *testing.T) { t.Errorf("Expected: %#v, got %#v", expectedObj, obj) } tmpStr := string(reqBodyExpected) - fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz", "POST", &tmpStr) - if fakeHandler.RequestReceived.URL.RawQuery != "labels=name%3Dfoo" { - t.Errorf("Unexpected query: %v", fakeHandler.RequestReceived.URL.RawQuery) - } + fakeHandler.ValidateRequest(t, "/api/v1beta1/foo/bar/baz?labels=name%3Dfoo", "POST", &tmpStr) if fakeHandler.RequestReceived.Header["Authorization"] == nil { t.Errorf("Request is missing authorization header: %#v", *fakeHandler.RequestReceived) } From 03cd22d4f4a682dc1a6d87f30bec08d45d4e7ef8 Mon Sep 17 00:00:00 2001 From: Daniel Smith Date: Wed, 20 Aug 2014 15:03:32 -0700 Subject: [PATCH 8/8] Handle errors slightly more intelligently --- plugin/pkg/scheduler/factory/factory.go | 26 ++++++++++++++-- plugin/pkg/scheduler/factory/factory_test.go | 31 ++++++++++++++++++++ 2 files changed, 54 insertions(+), 3 deletions(-) diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index cbea5893e92..d140b1904fd 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -27,6 +27,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" @@ -70,7 +71,7 @@ func (factory *ConfigFactory) Create() *scheduler.Config { NextPod: func() *api.Pod { return podQueue.Pop().(*api.Pod) }, - Error: factory.defaultErrorFunc, + Error: factory.makeDefaultErrorFunc(podQueue), } } @@ -118,8 +119,27 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { return &minionEnumerator{list}, nil } -func (factory *ConfigFactory) defaultErrorFunc(pod *api.Pod, err error) { - glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) +func (factory *ConfigFactory) makeDefaultErrorFunc(podQueue *cache.FIFO) func(pod *api.Pod, err error) { + return func(pod *api.Pod, err error) { + glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) + + // Retry asynchronously. + // Note that this is extremely rudimentary and we need a more real error handling path. + go func() { + defer util.HandleCrash() + podID := pod.ID + // Get the pod again; it may have changed/been scheduled already. + pod = &api.Pod{} + err := factory.Client.Get().Path("pods").Path(podID).Do().Into(pod) + if err != nil { + glog.Errorf("Error getting pod %v for retry: %v; abandoning", podID, err) + return + } + if pod.DesiredState.Host == "" { + podQueue.Add(pod.ID, pod) + } + }() + } } // storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index c542423526f..fb268d7bef2 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -20,6 +20,7 @@ import ( "net/http/httptest" "reflect" "testing" + "time" "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" @@ -128,6 +129,36 @@ func TestPollMinions(t *testing.T) { } } +func TestDefaultErrorFunc(t *testing.T) { + testPod := &api.Pod{JSONBase: api.JSONBase{ID: "foo"}} + handler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: api.EncodeOrDie(testPod), + T: t, + } + server := httptest.NewServer(&handler) + factory := ConfigFactory{client.New(server.URL, nil)} + queue := cache.NewFIFO() + errFunc := factory.makeDefaultErrorFunc(queue) + + errFunc(testPod, nil) + for { + // This is a terrible way to do this but I plan on replacing this + // whole error handling system in the future. The test will time + // out if something doesn't work. + time.Sleep(10 * time.Millisecond) + got, exists := queue.Get("foo") + if !exists { + continue + } + handler.ValidateRequest(t, "/api/v1beta1/pods/foo", "GET", nil) + if e, a := testPod, got; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %v, got %v", e, a) + } + break + } +} + func TestStoreToMinionLister(t *testing.T) { store := cache.NewStore() ids := util.NewStringSet("foo", "bar", "baz")