diff --git a/pkg/util/fake_handler.go b/pkg/util/fake_handler.go index 624e226b92a..97f4ea3e3fe 100644 --- a/pkg/util/fake_handler.go +++ b/pkg/util/fake_handler.go @@ -19,6 +19,8 @@ package util import ( "io/ioutil" "net/http" + "net/url" + "reflect" ) // TestInterface is a simple interface providing Errorf, to make injection for @@ -57,8 +59,15 @@ func (f *FakeHandler) ServeHTTP(response http.ResponseWriter, request *http.Requ // ValidateRequest verifies that FakeHandler received a request with expected path, method, and body. func (f FakeHandler) ValidateRequest(t TestInterface, expectedPath, expectedMethod string, body *string) { - if f.RequestReceived.URL.Path != expectedPath { - t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectedPath) + expectURL, err := url.Parse(expectedPath) + if err != nil { + t.Errorf("Couldn't parse %v as a URL.", expectedPath) + } + if f.RequestReceived.URL.Path != expectURL.Path { + t.Errorf("Unexpected request path for request %#v, received: %q, expected: %q", f.RequestReceived, f.RequestReceived.URL.Path, expectURL.Path) + } + if e, a := expectURL.Query(), f.RequestReceived.URL.Query(); !reflect.DeepEqual(e, a) { + t.Errorf("Unexpected query for request %#v, received: %q, expected: %q", f.RequestReceived, a, e) } if f.RequestReceived.Method != expectedMethod { t.Errorf("Unexpected method: %q, expected: %q", f.RequestReceived.Method, expectedMethod) diff --git a/plugin/cmd/scheduler/scheduler.go b/plugin/cmd/scheduler/scheduler.go index f7727693f87..05242cc8ddc 100644 --- a/plugin/cmd/scheduler/scheduler.go +++ b/plugin/cmd/scheduler/scheduler.go @@ -18,80 +18,18 @@ package main import ( "flag" - "math/rand" - "time" - "github.com/GoogleCloudPlatform/kubernetes/pkg/api" "github.com/GoogleCloudPlatform/kubernetes/pkg/client" - "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" - "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" - algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" verflag "github.com/GoogleCloudPlatform/kubernetes/pkg/version/flag" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" - - "github.com/golang/glog" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler/factory" ) var ( master = flag.String("master", "", "The address of the Kubernetes API server") ) -// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. -type storeToMinionLister struct { - cache.Store -} - -func (s *storeToMinionLister) List() (machines []string, err error) { - for _, m := range s.Store.List() { - machines = append(machines, m.(*api.Minion).ID) - } - return machines, nil -} - -// storeToPodLister turns a store into a pod lister. The store must contain (only) pods. -type storeToPodLister struct { - cache.Store -} - -func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { - for _, m := range s.List() { - pod := m.(*api.Pod) - if selector.Matches(labels.Set(pod.Labels)) { - pods = append(pods, *pod) - } - } - return pods, nil -} - -// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList -type minionEnumerator struct { - *api.MinionList -} - -// Returns the number of items in the pod list. -func (me *minionEnumerator) Len() int { - if me.MinionList == nil { - return 0 - } - return len(me.Items) -} - -// Returns the item (and ID) with the particular index. -func (me *minionEnumerator) Get(index int) (string, interface{}) { - return me.Items[index].ID, &me.Items[index] -} - -type binder struct { - *client.Client -} - -// Bind just does a POST binding RPC. -func (b *binder) Bind(binding *api.Binding) error { - return b.Post().Path("bindings").Body(binding).Do().Error() -} - func main() { flag.Parse() util.InitLogs() @@ -99,81 +37,12 @@ func main() { verflag.PrintAndExitIfRequested() - // This function is long because we inject all the dependencies into scheduler here. - // TODO: security story for plugins! kubeClient := client.New("http://"+*master, nil) - // Watch and queue pods that need scheduling. - podQueue := cache.NewFIFO() - cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { - // This query will only find pods with no assigned host. - return kubeClient. - Get(). - Path("watch"). - Path("pods"). - SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). - UintParam("resourceVersion", resourceVersion). - Watch() - }, &api.Pod{}, podQueue).Run() - - // Watch and cache all running pods. Scheduler needs to find all pods - // so it knows where it's safe to place a pod. Cache this locally. - podCache := cache.NewStore() - cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { - // This query will only find pods that do have an assigned host. - return kubeClient. - Get(). - Path("watch"). - Path("pods"). - ParseSelectorParam("fields", "DesiredState.Host!="). - UintParam("resourceVersion", resourceVersion). - Watch() - }, &api.Pod{}, podCache).Run() - - // Watch minions. - // Minions may be listed frequently, so provide a local up-to-date cache. - minionCache := cache.NewStore() - if false { - // Disable this code until minions support watches. - cache.NewReflector(func(resourceVersion uint64) (watch.Interface, error) { - // This query will only find pods that do have an assigned host. - return kubeClient. - Get(). - Path("watch"). - Path("minions"). - UintParam("resourceVersion", resourceVersion). - Watch() - }, &api.Minion{}, minionCache).Run() - } else { - cache.NewPoller(func() (cache.Enumerator, error) { - // This query will only find pods that do have an assigned host. - list := &api.MinionList{} - err := kubeClient.Get().Path("minions").Do().Into(list) - if err != nil { - return nil, err - } - return &minionEnumerator{list}, nil - }, 10*time.Second, minionCache).Run() - } - - r := rand.New(rand.NewSource(time.Now().UnixNano())) - algo := algorithm.NewRandomFitScheduler( - &storeToPodLister{podCache}, r) - - s := scheduler.New(&scheduler.Config{ - MinionLister: &storeToMinionLister{minionCache}, - Algorithm: algo, - Binder: &binder{kubeClient}, - NextPod: func() *api.Pod { - return podQueue.Pop().(*api.Pod) - }, - Error: func(pod *api.Pod, err error) { - glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) - podQueue.Add(pod.ID, pod) - }, - }) - + configFactory := &factory.ConfigFactory{Client: kubeClient} + config := configFactory.Create() + s := scheduler.New(config) s.Run() select {} diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go new file mode 100644 index 00000000000..cbea5893e92 --- /dev/null +++ b/plugin/pkg/scheduler/factory/factory.go @@ -0,0 +1,177 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Package factory can set up a scheduler. This code is here instead of +// plugin/cmd/scheduler for both testability and reuse. +package factory + +import ( + "math/rand" + "time" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" + "github.com/GoogleCloudPlatform/kubernetes/plugin/pkg/scheduler" + + "github.com/golang/glog" +) + +// ConfigFactory knows how to fill out a scheduler config with its support functions. +type ConfigFactory struct { + Client *client.Client +} + +// Create creates a scheduler and all support functions. +func (factory *ConfigFactory) Create() *scheduler.Config { + // Watch and queue pods that need scheduling. + podQueue := cache.NewFIFO() + cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run() + + // Watch and cache all running pods. Scheduler needs to find all pods + // so it knows where it's safe to place a pod. Cache this locally. + podCache := cache.NewStore() + cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run() + + // Watch minions. + // Minions may be listed frequently, so provide a local up-to-date cache. + minionCache := cache.NewStore() + if false { + // Disable this code until minions support watches. + cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run() + } else { + cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run() + } + + r := rand.New(rand.NewSource(time.Now().UnixNano())) + algo := algorithm.NewRandomFitScheduler( + &storeToPodLister{podCache}, r) + + return &scheduler.Config{ + MinionLister: &storeToMinionLister{minionCache}, + Algorithm: algo, + Binder: &binder{factory.Client}, + NextPod: func() *api.Pod { + return podQueue.Pop().(*api.Pod) + }, + Error: factory.defaultErrorFunc, + } +} + +// createUnassignedPodWatch starts a watch that finds all pods that need to be +// scheduled. +func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) { + return factory.Client. + Get(). + Path("watch"). + Path("pods"). + SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// createUnassignedPodWatch starts a watch that finds all pods that are +// already scheduled. +func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) { + return factory.Client. + Get(). + Path("watch"). + Path("pods"). + ParseSelectorParam("fields", "DesiredState.Host!="). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// createMinionWatch starts a watch that gets all changes to minions. +func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) { + return factory.Client. + Get(). + Path("watch"). + Path("minions"). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// pollMinions lists all minions and returns an enumerator for cache.Poller. +func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { + list := &api.MinionList{} + err := factory.Client.Get().Path("minions").Do().Into(list) + if err != nil { + return nil, err + } + return &minionEnumerator{list}, nil +} + +func (factory *ConfigFactory) defaultErrorFunc(pod *api.Pod, err error) { + glog.Errorf("Error scheduling %v: %v; retrying", pod.ID, err) +} + +// storeToMinionLister turns a store into a minion lister. The store must contain (only) minions. +type storeToMinionLister struct { + cache.Store +} + +func (s *storeToMinionLister) List() (machines []string, err error) { + for _, m := range s.Store.List() { + machines = append(machines, m.(*api.Minion).ID) + } + return machines, nil +} + +// storeToPodLister turns a store into a pod lister. The store must contain (only) pods. +type storeToPodLister struct { + cache.Store +} + +func (s *storeToPodLister) ListPods(selector labels.Selector) (pods []api.Pod, err error) { + for _, m := range s.List() { + pod := m.(*api.Pod) + if selector.Matches(labels.Set(pod.Labels)) { + pods = append(pods, *pod) + } + } + return pods, nil +} + +// minionEnumerator allows a cache.Poller to enumerate items in an api.PodList +type minionEnumerator struct { + *api.MinionList +} + +// Returns the number of items in the pod list. +func (me *minionEnumerator) Len() int { + if me.MinionList == nil { + return 0 + } + return len(me.Items) +} + +// Returns the item (and ID) with the particular index. +func (me *minionEnumerator) Get(index int) (string, interface{}) { + return me.Items[index].ID, &me.Items[index] +} + +type binder struct { + *client.Client +} + +// Bind just does a POST binding RPC. +func (b *binder) Bind(binding *api.Binding) error { + return b.Post().Path("bindings").Body(binding).Do().Error() +} diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go new file mode 100644 index 00000000000..c542423526f --- /dev/null +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -0,0 +1,224 @@ +/* +Copyright 2014 Google Inc. All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package factory + +import ( + "net/http/httptest" + "reflect" + "testing" + + "github.com/GoogleCloudPlatform/kubernetes/pkg/api" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client" + "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" + "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/util" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" +) + +func TestCreate(t *testing.T) { + handler := util.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + factory := ConfigFactory{client.New(server.URL, nil)} + factory.Create() +} + +func TestCreateWatches(t *testing.T) { + factory := ConfigFactory{nil} + table := []struct { + rv uint64 + location string + watchFactory func(rv uint64) (watch.Interface, error) + }{ + // Minion watch + { + rv: 0, + location: "/api/v1beta1/watch/minions?resourceVersion=0", + watchFactory: factory.createMinionWatch, + }, { + rv: 42, + location: "/api/v1beta1/watch/minions?resourceVersion=42", + watchFactory: factory.createMinionWatch, + }, + // Assigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0", + watchFactory: factory.createAssignedPodWatch, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", + watchFactory: factory.createAssignedPodWatch, + }, + // Unassigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0", + watchFactory: factory.createUnassignedPodWatch, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", + watchFactory: factory.createUnassignedPodWatch, + }, + } + + for _, item := range table { + handler := util.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + factory.Client = client.New(server.URL, nil) + // This test merely tests that the correct request is made. + item.watchFactory(item.rv) + handler.ValidateRequest(t, item.location, "GET", nil) + } +} + +func TestPollMinions(t *testing.T) { + table := []struct { + minions []api.Minion + }{ + { + minions: []api.Minion{ + {JSONBase: api.JSONBase{ID: "foo"}}, + {JSONBase: api.JSONBase{ID: "bar"}}, + }, + }, + } + + for _, item := range table { + ml := &api.MinionList{Items: item.minions} + handler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: api.EncodeOrDie(ml), + T: t, + } + server := httptest.NewServer(&handler) + cf := ConfigFactory{client.New(server.URL, nil)} + + ce, err := cf.pollMinions() + if err != nil { + t.Errorf("Unexpected error: %v", err) + continue + } + handler.ValidateRequest(t, "/api/v1beta1/minions", "GET", nil) + + if e, a := len(item.minions), ce.Len(); e != a { + t.Errorf("Expected %v, got %v", e, a) + } + } +} + +func TestStoreToMinionLister(t *testing.T) { + store := cache.NewStore() + ids := util.NewStringSet("foo", "bar", "baz") + for id := range ids { + store.Add(id, &api.Minion{JSONBase: api.JSONBase{ID: id}}) + } + sml := storeToMinionLister{store} + + got, err := sml.List() + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + if !ids.HasAll(got...) || len(got) != len(ids) { + t.Errorf("Expected %v, got %v", ids, got) + } +} + +func TestStoreToPodLister(t *testing.T) { + store := cache.NewStore() + ids := []string{"foo", "bar", "baz"} + for _, id := range ids { + store.Add(id, &api.Pod{ + JSONBase: api.JSONBase{ID: id}, + Labels: map[string]string{"name": id}, + }) + } + spl := storeToPodLister{store} + + for _, id := range ids { + got, err := spl.ListPods(labels.Set{"name": id}.AsSelector()) + if err != nil { + t.Errorf("Unexpected error: %v", err) + continue + } + if e, a := 1, len(got); e != a { + t.Errorf("Expected %v, got %v", e, a) + continue + } + if e, a := id, got[0].ID; e != a { + t.Errorf("Expected %v, got %v", e, a) + continue + } + } +} + +func TestMinionEnumerator(t *testing.T) { + testList := &api.MinionList{ + Items: []api.Minion{ + {JSONBase: api.JSONBase{ID: "foo"}}, + {JSONBase: api.JSONBase{ID: "bar"}}, + {JSONBase: api.JSONBase{ID: "baz"}}, + }, + } + me := minionEnumerator{testList} + + if e, a := 3, me.Len(); e != a { + t.Fatalf("expected %v, got %v", e, a) + } + for i := range testList.Items { + gotID, gotObj := me.Get(i) + if e, a := testList.Items[i].ID, gotID; e != a { + t.Errorf("Expected %v, got %v", e, a) + } + if e, a := &testList.Items[i], gotObj; !reflect.DeepEqual(e, a) { + t.Errorf("Expected %#v, got %v#", e, a) + } + } +} + +func TestBind(t *testing.T) { + table := []struct { + binding *api.Binding + }{ + {binding: &api.Binding{PodID: "foo", Host: "foohost.kubernetes.mydomain.com"}}, + } + + for _, item := range table { + handler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + b := binder{client.New(server.URL, nil)} + + err := b.Bind(item.binding) + if err != nil { + t.Errorf("Unexpected error: %v", err) + continue + } + expectedBody := api.EncodeOrDie(item.binding) + handler.ValidateRequest(t, "/api/v1beta1/bindings", "POST", &expectedBody) + } +}