diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index e63be7713fb..c67aa863785 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -26,6 +26,7 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" @@ -43,19 +44,19 @@ type ConfigFactory struct { func (factory *ConfigFactory) Create() *scheduler.Config { // Watch and queue pods that need scheduling. podQueue := cache.NewFIFO() - cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run() + cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run() // Watch and cache all running pods. Scheduler needs to find all pods // so it knows where it's safe to place a pod. Cache this locally. podCache := cache.NewStore() - cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run() + cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run() // Watch minions. // Minions may be listed frequently, so provide a local up-to-date cache. minionCache := cache.NewStore() if false { // Disable this code until minions support watches. - cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run() + cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run() } else { cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run() } @@ -82,38 +83,66 @@ func (factory *ConfigFactory) Create() *scheduler.Config { } } -// createUnassignedPodWatch starts a watch that finds all pods that need to be +type listWatch struct { + client *client.Client + fieldSelector labels.Selector + resource string +} + +func (lw *listWatch) List() (runtime.Object, error) { + return lw.client. + Get(). + Path(lw.resource). + SelectorParam("fields", lw.fieldSelector). + Do(). + Get() +} + +func (lw *listWatch) Watch(resourceVersion uint64) (watch.Interface, error) { + return lw.client. + Get(). + Path("watch"). + Path(lw.resource). + SelectorParam("fields", lw.fieldSelector). + UintParam("resourceVersion", resourceVersion). + Watch() +} + +// createUnassignedPodLW returns a listWatch that finds all pods that need to be // scheduled. -func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) { - return factory.Client. - Get(). - Path("watch"). - Path("pods"). - SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()). - UintParam("resourceVersion", resourceVersion). - Watch() +func (factory *ConfigFactory) createUnassignedPodLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), + resource: "pods", + } } -// createUnassignedPodWatch starts a watch that finds all pods that are +func parseSelectorOrDie(s string) labels.Selector { + selector, err := labels.ParseSelector(s) + if err != nil { + panic(err) + } + return selector +} + +// createUnassignedPodLW returns a listWatch that finds all pods that are // already scheduled. -func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) { - return factory.Client. - Get(). - Path("watch"). - Path("pods"). - ParseSelectorParam("fields", "DesiredState.Host!="). - UintParam("resourceVersion", resourceVersion). - Watch() +func (factory *ConfigFactory) createAssignedPodLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: parseSelectorOrDie("DesiredState.Host!="), + resource: "pods", + } } -// createMinionWatch starts a watch that gets all changes to minions. -func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) { - return factory.Client. - Get(). - Path("watch"). - Path("minions"). - UintParam("resourceVersion", resourceVersion). - Watch() +// createMinionLW returns a listWatch that gets all changes to minions. +func (factory *ConfigFactory) createMinionLW() *listWatch { + return &listWatch{ + client: factory.Client, + fieldSelector: parseSelectorOrDie(""), + resource: "minions", + } } // pollMinions lists all minions and returns an enumerator for cache.Poller. diff --git a/plugin/pkg/scheduler/factory/factory_test.go b/plugin/pkg/scheduler/factory/factory_test.go index 4ad09db1862..1845d41c363 100644 --- a/plugin/pkg/scheduler/factory/factory_test.go +++ b/plugin/pkg/scheduler/factory/factory_test.go @@ -30,7 +30,6 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" "github.com/GoogleCloudPlatform/kubernetes/pkg/util" - "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func TestCreate(t *testing.T) { @@ -45,42 +44,26 @@ func TestCreate(t *testing.T) { factory.Create() } -func TestCreateWatches(t *testing.T) { +func TestCreateLists(t *testing.T) { factory := ConfigFactory{nil} table := []struct { - rv uint64 - location string - watchFactory func(rv uint64) (watch.Interface, error) + location string + factory func() *listWatch }{ - // Minion watch + // Minion { - rv: 0, - location: "/api/v1beta1/watch/minions?resourceVersion=0", - watchFactory: factory.createMinionWatch, - }, { - rv: 42, - location: "/api/v1beta1/watch/minions?resourceVersion=42", - watchFactory: factory.createMinionWatch, + location: "/api/v1beta1/minions?fields=", + factory: factory.createMinionLW, }, - // Assigned pod watches + // Assigned pod { - rv: 0, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0", - watchFactory: factory.createAssignedPodWatch, - }, { - rv: 42, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", - watchFactory: factory.createAssignedPodWatch, + location: "/api/v1beta1/pods?fields=DesiredState.Host!%3D", + factory: factory.createAssignedPodLW, }, - // Unassigned pod watches + // Unassigned pod { - rv: 0, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0", - watchFactory: factory.createUnassignedPodWatch, - }, { - rv: 42, - location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", - watchFactory: factory.createUnassignedPodWatch, + location: "/api/v1beta1/pods?fields=DesiredState.Host%3D", + factory: factory.createUnassignedPodLW, }, } @@ -93,7 +76,60 @@ func TestCreateWatches(t *testing.T) { server := httptest.NewServer(&handler) factory.Client = client.NewOrDie(server.URL, nil) // This test merely tests that the correct request is made. - item.watchFactory(item.rv) + item.factory().List() + handler.ValidateRequest(t, item.location, "GET", nil) + } +} + +func TestCreateWatches(t *testing.T) { + factory := ConfigFactory{nil} + table := []struct { + rv uint64 + location string + factory func() *listWatch + }{ + // Minion watch + { + rv: 0, + location: "/api/v1beta1/watch/minions?fields=&resourceVersion=0", + factory: factory.createMinionLW, + }, { + rv: 42, + location: "/api/v1beta1/watch/minions?fields=&resourceVersion=42", + factory: factory.createMinionLW, + }, + // Assigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0", + factory: factory.createAssignedPodLW, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42", + factory: factory.createAssignedPodLW, + }, + // Unassigned pod watches + { + rv: 0, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0", + factory: factory.createUnassignedPodLW, + }, { + rv: 42, + location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42", + factory: factory.createUnassignedPodLW, + }, + } + + for _, item := range table { + handler := util.FakeHandler{ + StatusCode: 500, + ResponseBody: "", + T: t, + } + server := httptest.NewServer(&handler) + factory.Client = client.NewOrDie(server.URL, nil) + // This test merely tests that the correct request is made. + item.factory().Watch(item.rv) handler.ValidateRequest(t, item.location, "GET", nil) } }