Merge pull request #4453 from derekwaynecarr/make_quota_more_efficient

Make ListWatch work with a ListFunc and WatchFunc
This commit is contained in:
Clayton Coleman 2015-02-17 14:48:11 -05:00
commit 4859aa7cd8
7 changed files with 94 additions and 104 deletions

View File

@ -23,33 +23,37 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch" "github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
// ListFunc knows how to list resources
type ListFunc func() (runtime.Object, error)
// WatchFunc knows how to watch resources
type WatchFunc func(resourceVersion string) (watch.Interface, error)
// ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface.
// It is a convenience function for users of NewReflector, etc. Client must not be nil. // It is a convenience function for users of NewReflector, etc.
// ListFunc and WatchFunc must not be nil
type ListWatch struct { type ListWatch struct {
Client *client.Client ListFunc ListFunc
FieldSelector labels.Selector WatchFunc WatchFunc
Resource string
Namespace string
} }
// ListWatch knows how to list and watch a set of apiserver resources. // ListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector
func NewListWatchFromClient(client *client.Client, resource string, namespace string, fieldSelector labels.Selector) *ListWatch {
listFunc := func() (runtime.Object, error) {
return client.Get().Namespace(namespace).Resource(resource).SelectorParam("fields", fieldSelector).Do().Get()
}
watchFunc := func(resourceVersion string) (watch.Interface, error) {
return client.Get().Prefix("watch").Namespace(namespace).Resource(resource).SelectorParam("fields", fieldSelector).Param("resourceVersion", resourceVersion).Watch()
}
return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc}
}
// List a set of apiserver resources
func (lw *ListWatch) List() (runtime.Object, error) { func (lw *ListWatch) List() (runtime.Object, error) {
return lw.Client. return lw.ListFunc()
Get().
Namespace(lw.Namespace).
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Do().
Get()
} }
// Watch a set of apiserver resources
func (lw *ListWatch) Watch(resourceVersion string) (watch.Interface, error) { func (lw *ListWatch) Watch(resourceVersion string) (watch.Interface, error) {
return lw.Client. return lw.WatchFunc(resourceVersion)
Get().
Prefix("watch").
Namespace(lw.Namespace).
Resource(lw.Resource).
SelectorParam("fields", lw.FieldSelector).
Param("resourceVersion", resourceVersion).
Watch()
} }

View File

@ -70,33 +70,31 @@ func buildLocation(resourcePath string, query url.Values) string {
func TestListWatchesCanList(t *testing.T) { func TestListWatchesCanList(t *testing.T) {
table := []struct { table := []struct {
location string location string
lw ListWatch resource string
namespace string
fieldSelector labels.Selector
}{ }{
// Minion // Minion
{ {
location: buildLocation(buildResourcePath("", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, nil)), location: buildLocation(buildResourcePath("", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, nil)),
lw: ListWatch{ resource: "minions",
FieldSelector: parseSelectorOrDie(""), namespace: api.NamespaceAll,
Resource: "minions", fieldSelector: parseSelectorOrDie(""),
},
}, },
// pod with "assigned" field selector. // pod with "assigned" field selector.
{ {
location: buildLocation(buildResourcePath("", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}})), location: buildLocation(buildResourcePath("", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}})),
lw: ListWatch{ resource: "pods",
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), namespace: api.NamespaceAll,
Resource: "pods", fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
},
}, },
// pod in namespace "foo" // pod in namespace "foo"
{ {
location: buildLocation(buildResourcePath("", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}})), location: buildLocation(buildResourcePath("", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}})),
lw: ListWatch{ resource: "pods",
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), namespace: "foo",
Resource: "pods", fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Namespace: "foo",
},
}, },
} }
for _, item := range table { for _, item := range table {
@ -107,54 +105,52 @@ func TestListWatchesCanList(t *testing.T) {
} }
server := httptest.NewServer(&handler) server := httptest.NewServer(&handler)
defer server.Close() defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
lw := NewListWatchFromClient(client, item.resource, item.namespace, item.fieldSelector)
// This test merely tests that the correct request is made. // This test merely tests that the correct request is made.
item.lw.List() lw.List()
handler.ValidateRequest(t, item.location, "GET", nil) handler.ValidateRequest(t, item.location, "GET", nil)
} }
} }
func TestListWatchesCanWatch(t *testing.T) { func TestListWatchesCanWatch(t *testing.T) {
table := []struct { table := []struct {
rv string rv string
location string location string
lw ListWatch resource string
namespace string
fieldSelector labels.Selector
}{ }{
// Minion // Minion
{ {
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{""}})), location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{""}})),
rv: "", rv: "",
lw: ListWatch{ resource: "minions",
FieldSelector: parseSelectorOrDie(""), namespace: api.NamespaceAll,
Resource: "minions", fieldSelector: parseSelectorOrDie(""),
},
}, },
{ {
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{"42"}})), location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{"42"}})),
rv: "42", rv: "42",
lw: ListWatch{ resource: "minions",
FieldSelector: parseSelectorOrDie(""), namespace: api.NamespaceAll,
Resource: "minions", fieldSelector: parseSelectorOrDie(""),
},
}, },
// pod with "assigned" field selector. // pod with "assigned" field selector.
{ {
location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})), location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})),
rv: "0", rv: "0",
lw: ListWatch{ resource: "pods",
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), namespace: api.NamespaceAll,
Resource: "pods", fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
},
}, },
// pod with namespace foo and assigned field selector // pod with namespace foo and assigned field selector
{ {
location: buildLocation(buildResourcePath("watch", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})), location: buildLocation(buildResourcePath("watch", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})),
rv: "0", rv: "0",
lw: ListWatch{ resource: "pods",
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), namespace: "foo",
Resource: "pods", fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Namespace: "foo",
},
}, },
} }
@ -166,10 +162,10 @@ func TestListWatchesCanWatch(t *testing.T) {
} }
server := httptest.NewServer(&handler) server := httptest.NewServer(&handler)
defer server.Close() defer server.Close()
item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()})
lw := NewListWatchFromClient(client, item.resource, item.namespace, item.fieldSelector)
// This test merely tests that the correct request is made. // This test merely tests that the correct request is made.
item.lw.Watch(item.rv) lw.Watch(item.rv)
handler.ValidateRequest(t, item.location, "GET", nil) handler.ValidateRequest(t, item.location, "GET", nil)
} }
} }

View File

@ -28,11 +28,7 @@ import (
// NewSourceApiserver creates a config source that watches and pulls from the apiserver. // NewSourceApiserver creates a config source that watches and pulls from the apiserver.
func NewSourceApiserver(client *client.Client, hostname string, updates chan<- interface{}) { func NewSourceApiserver(client *client.Client, hostname string, updates chan<- interface{}) {
lw := &cache.ListWatch{ lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, labels.OneTermEqualSelector("Status.Host", hostname))
Client: client,
FieldSelector: labels.OneTermEqualSelector("Status.Host", hostname),
Resource: "pods",
}
newSourceApiserverFromLW(lw, updates) newSourceApiserverFromLW(lw, updates)
} }

View File

@ -95,7 +95,7 @@ func NewMainKubelet(
serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc)
if kubeClient != nil { if kubeClient != nil {
cache.NewReflector(&cache.ListWatch{kubeClient, labels.Everything(), "services", api.NamespaceAll}, &api.Service{}, serviceStore).Run() cache.NewReflector(cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()), &api.Service{}, serviceStore).Run()
} }
serviceLister := &cache.StoreToServiceLister{serviceStore} serviceLister := &cache.StoreToServiceLister{serviceStore}

View File

@ -26,6 +26,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
func init() { func init() {
@ -79,9 +81,12 @@ func NewProvision(c client.Interface) admission.Interface {
store := cache.NewStore(cache.MetaNamespaceKeyFunc) store := cache.NewStore(cache.MetaNamespaceKeyFunc)
reflector := cache.NewReflector( reflector := cache.NewReflector(
&cache.ListWatch{ &cache.ListWatch{
Client: c.(*client.Client), ListFunc: func() (runtime.Object, error) {
FieldSelector: labels.Everything(), return c.Namespaces().List(labels.Everything())
Resource: "namespaces", },
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.Namespaces().Watch(labels.Everything(), labels.Everything(), resourceVersion)
},
}, },
&api.Namespace{}, &api.Namespace{},
store, store,

View File

@ -28,6 +28,8 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
) )
func init() { func init() {
@ -83,9 +85,12 @@ func NewExists(c client.Interface) admission.Interface {
// TODO: look into a list/watch that can work with client.Interface, maybe pass it a ListFunc and a WatchFunc // TODO: look into a list/watch that can work with client.Interface, maybe pass it a ListFunc and a WatchFunc
reflector := cache.NewReflector( reflector := cache.NewReflector(
&cache.ListWatch{ &cache.ListWatch{
Client: c.(*client.Client), ListFunc: func() (runtime.Object, error) {
FieldSelector: labels.Everything(), return c.Namespaces().List(labels.Everything())
Resource: "namespaces", },
WatchFunc: func(resourceVersion string) (watch.Interface, error) {
return c.Namespaces().Watch(labels.Everything(), labels.Everything(), resourceVersion)
},
}, },
&api.Namespace{}, &api.Namespace{},
store, store,

View File

@ -143,11 +143,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe
// createUnassignedPodLW returns a cache.ListWatch that finds all pods that need to be // createUnassignedPodLW returns a cache.ListWatch that finds all pods that need to be
// scheduled. // scheduled.
func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch { func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch {
return &cache.ListWatch{ return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, labels.Set{"DesiredState.Host": ""}.AsSelector())
Client: factory.Client,
FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(),
Resource: "pods",
}
} }
func parseSelectorOrDie(s string) labels.Selector { func parseSelectorOrDie(s string) labels.Selector {
@ -162,20 +158,12 @@ func parseSelectorOrDie(s string) labels.Selector {
// already scheduled. // already scheduled.
// TODO: return a ListerWatcher interface instead? // TODO: return a ListerWatcher interface instead?
func (factory *ConfigFactory) createAssignedPodLW() *cache.ListWatch { func (factory *ConfigFactory) createAssignedPodLW() *cache.ListWatch {
return &cache.ListWatch{ return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, parseSelectorOrDie("DesiredState.Host!="))
Client: factory.Client,
FieldSelector: parseSelectorOrDie("DesiredState.Host!="),
Resource: "pods",
}
} }
// createMinionLW returns a cache.ListWatch that gets all changes to minions. // createMinionLW returns a cache.ListWatch that gets all changes to minions.
func (factory *ConfigFactory) createMinionLW() *cache.ListWatch { func (factory *ConfigFactory) createMinionLW() *cache.ListWatch {
return &cache.ListWatch{ return cache.NewListWatchFromClient(factory.Client, "minions", api.NamespaceAll, parseSelectorOrDie(""))
Client: factory.Client,
FieldSelector: parseSelectorOrDie(""),
Resource: "minions",
}
} }
// pollMinions lists all minions and filter out unhealthy ones, then returns // pollMinions lists all minions and filter out unhealthy ones, then returns
@ -215,11 +203,7 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) {
// createServiceLW returns a cache.ListWatch that gets all changes to services. // createServiceLW returns a cache.ListWatch that gets all changes to services.
func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { func (factory *ConfigFactory) createServiceLW() *cache.ListWatch {
return &cache.ListWatch{ return cache.NewListWatchFromClient(factory.Client, "services", api.NamespaceAll, parseSelectorOrDie(""))
Client: factory.Client,
FieldSelector: parseSelectorOrDie(""),
Resource: "services",
}
} }
func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) { func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {