Plumb pkg/client/cache changes into scheduler

This commit is contained in:
Daniel Smith 2014-09-16 14:08:57 -07:00
parent b1169ed91e
commit 3f659f7d74
2 changed files with 124 additions and 59 deletions

View File

@ -26,6 +26,7 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
algorithm "github.com/GoogleCloudPlatform/kubernetes/pkg/scheduler"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
@ -43,19 +44,19 @@ type ConfigFactory struct {
func (factory *ConfigFactory) Create() *scheduler.Config {
// Watch and queue pods that need scheduling.
podQueue := cache.NewFIFO()
cache.NewReflector(factory.createUnassignedPodWatch, &api.Pod{}, podQueue).Run()
cache.NewReflector(factory.createUnassignedPodLW(), &api.Pod{}, podQueue).Run()
// Watch and cache all running pods. Scheduler needs to find all pods
// so it knows where it's safe to place a pod. Cache this locally.
podCache := cache.NewStore()
cache.NewReflector(factory.createAssignedPodWatch, &api.Pod{}, podCache).Run()
cache.NewReflector(factory.createAssignedPodLW(), &api.Pod{}, podCache).Run()
// Watch minions.
// Minions may be listed frequently, so provide a local up-to-date cache.
minionCache := cache.NewStore()
if false {
// Disable this code until minions support watches.
cache.NewReflector(factory.createMinionWatch, &api.Minion{}, minionCache).Run()
cache.NewReflector(factory.createMinionLW(), &api.Minion{}, minionCache).Run()
} else {
cache.NewPoller(factory.pollMinions, 10*time.Second, minionCache).Run()
}
@ -82,38 +83,66 @@ func (factory *ConfigFactory) Create() *scheduler.Config {
}
}
// createUnassignedPodWatch starts a watch that finds all pods that need to be
type listWatch struct {
client *client.Client
fieldSelector labels.Selector
resource string
}
func (lw *listWatch) List() (runtime.Object, error) {
return lw.client.
Get().
Path(lw.resource).
SelectorParam("fields", lw.fieldSelector).
Do().
Get()
}
func (lw *listWatch) Watch(resourceVersion uint64) (watch.Interface, error) {
return lw.client.
Get().
Path("watch").
Path(lw.resource).
SelectorParam("fields", lw.fieldSelector).
UintParam("resourceVersion", resourceVersion).
Watch()
}
// createUnassignedPodLW returns a listWatch that finds all pods that need to be
// scheduled.
func (factory *ConfigFactory) createUnassignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("pods").
SelectorParam("fields", labels.Set{"DesiredState.Host": ""}.AsSelector()).
UintParam("resourceVersion", resourceVersion).
Watch()
func (factory *ConfigFactory) createUnassignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
resource: "pods",
}
}
// createUnassignedPodWatch starts a watch that finds all pods that are
func parseSelectorOrDie(s string) labels.Selector {
selector, err := labels.ParseSelector(s)
if err != nil {
panic(err)
}
return selector
}
// createUnassignedPodLW returns a listWatch that finds all pods that are
// already scheduled.
func (factory *ConfigFactory) createAssignedPodWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("pods").
ParseSelectorParam("fields", "DesiredState.Host!=").
UintParam("resourceVersion", resourceVersion).
Watch()
func (factory *ConfigFactory) createAssignedPodLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie("DesiredState.Host!="),
resource: "pods",
}
}
// createMinionWatch starts a watch that gets all changes to minions.
func (factory *ConfigFactory) createMinionWatch(resourceVersion uint64) (watch.Interface, error) {
return factory.Client.
Get().
Path("watch").
Path("minions").
UintParam("resourceVersion", resourceVersion).
Watch()
// createMinionLW returns a listWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *listWatch {
return &listWatch{
client: factory.Client,
fieldSelector: parseSelectorOrDie(""),
resource: "minions",
}
}
// pollMinions lists all minions and returns an enumerator for cache.Poller.

View File

@ -30,7 +30,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
func TestCreate(t *testing.T) {
@ -45,42 +44,26 @@ func TestCreate(t *testing.T) {
factory.Create()
}
func TestCreateWatches(t *testing.T) {
func TestCreateLists(t *testing.T) {
factory := ConfigFactory{nil}
table := []struct {
rv uint64
location string
watchFactory func(rv uint64) (watch.Interface, error)
location string
factory func() *listWatch
}{
// Minion watch
// Minion
{
rv: 0,
location: "/api/v1beta1/watch/minions?resourceVersion=0",
watchFactory: factory.createMinionWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/minions?resourceVersion=42",
watchFactory: factory.createMinionWatch,
location: "/api/v1beta1/minions?fields=",
factory: factory.createMinionLW,
},
// Assigned pod watches
// Assigned pod
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0",
watchFactory: factory.createAssignedPodWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
watchFactory: factory.createAssignedPodWatch,
location: "/api/v1beta1/pods?fields=DesiredState.Host!%3D",
factory: factory.createAssignedPodLW,
},
// Unassigned pod watches
// Unassigned pod
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
watchFactory: factory.createUnassignedPodWatch,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
watchFactory: factory.createUnassignedPodWatch,
location: "/api/v1beta1/pods?fields=DesiredState.Host%3D",
factory: factory.createUnassignedPodLW,
},
}
@ -93,7 +76,60 @@ func TestCreateWatches(t *testing.T) {
server := httptest.NewServer(&handler)
factory.Client = client.NewOrDie(server.URL, nil)
// This test merely tests that the correct request is made.
item.watchFactory(item.rv)
item.factory().List()
handler.ValidateRequest(t, item.location, "GET", nil)
}
}
func TestCreateWatches(t *testing.T) {
factory := ConfigFactory{nil}
table := []struct {
rv uint64
location string
factory func() *listWatch
}{
// Minion watch
{
rv: 0,
location: "/api/v1beta1/watch/minions?fields=&resourceVersion=0",
factory: factory.createMinionLW,
}, {
rv: 42,
location: "/api/v1beta1/watch/minions?fields=&resourceVersion=42",
factory: factory.createMinionLW,
},
// Assigned pod watches
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=0",
factory: factory.createAssignedPodLW,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host!%3D&resourceVersion=42",
factory: factory.createAssignedPodLW,
},
// Unassigned pod watches
{
rv: 0,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=0",
factory: factory.createUnassignedPodLW,
}, {
rv: 42,
location: "/api/v1beta1/watch/pods?fields=DesiredState.Host%3D&resourceVersion=42",
factory: factory.createUnassignedPodLW,
},
}
for _, item := range table {
handler := util.FakeHandler{
StatusCode: 500,
ResponseBody: "",
T: t,
}
server := httptest.NewServer(&handler)
factory.Client = client.NewOrDie(server.URL, nil)
// This test merely tests that the correct request is made.
item.factory().Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil)
}
}