diff --git a/pkg/client/cache/listwatch.go b/pkg/client/cache/listwatch.go index f6e724e2a3f..96fdecaa040 100644 --- a/pkg/client/cache/listwatch.go +++ b/pkg/client/cache/listwatch.go @@ -23,33 +23,37 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) +// ListFunc knows how to list resources +type ListFunc func() (runtime.Object, error) + +// WatchFunc knows how to watch resources +type WatchFunc func(resourceVersion string) (watch.Interface, error) + // ListWatch knows how to list and watch a set of apiserver resources. It satisfies the ListerWatcher interface. -// It is a convenience function for users of NewReflector, etc. Client must not be nil. +// It is a convenience function for users of NewReflector, etc. +// ListFunc and WatchFunc must not be nil type ListWatch struct { - Client *client.Client - FieldSelector labels.Selector - Resource string - Namespace string + ListFunc ListFunc + WatchFunc WatchFunc } -// ListWatch knows how to list and watch a set of apiserver resources. +// ListWatchFromClient creates a new ListWatch from the specified client, resource, namespace and field selector +func NewListWatchFromClient(client *client.Client, resource string, namespace string, fieldSelector labels.Selector) *ListWatch { + listFunc := func() (runtime.Object, error) { + return client.Get().Namespace(namespace).Resource(resource).SelectorParam("fields", fieldSelector).Do().Get() + } + watchFunc := func(resourceVersion string) (watch.Interface, error) { + return client.Get().Prefix("watch").Namespace(namespace).Resource(resource).SelectorParam("fields", fieldSelector).Param("resourceVersion", resourceVersion).Watch() + } + return &ListWatch{ListFunc: listFunc, WatchFunc: watchFunc} +} + +// List a set of apiserver resources func (lw *ListWatch) List() (runtime.Object, error) { - return lw.Client. - Get(). - Namespace(lw.Namespace). - Resource(lw.Resource). - SelectorParam("fields", lw.FieldSelector). - Do(). - Get() + return lw.ListFunc() } +// Watch a set of apiserver resources func (lw *ListWatch) Watch(resourceVersion string) (watch.Interface, error) { - return lw.Client. - Get(). - Prefix("watch"). - Namespace(lw.Namespace). - Resource(lw.Resource). - SelectorParam("fields", lw.FieldSelector). - Param("resourceVersion", resourceVersion). - Watch() + return lw.WatchFunc(resourceVersion) } diff --git a/pkg/client/cache/listwatch_test.go b/pkg/client/cache/listwatch_test.go index c61412069f8..43529761d9d 100644 --- a/pkg/client/cache/listwatch_test.go +++ b/pkg/client/cache/listwatch_test.go @@ -70,33 +70,31 @@ func buildLocation(resourcePath string, query url.Values) string { func TestListWatchesCanList(t *testing.T) { table := []struct { - location string - lw ListWatch + location string + resource string + namespace string + fieldSelector labels.Selector }{ // Minion { - location: buildLocation(buildResourcePath("", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, nil)), - lw: ListWatch{ - FieldSelector: parseSelectorOrDie(""), - Resource: "minions", - }, + location: buildLocation(buildResourcePath("", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, nil)), + resource: "minions", + namespace: api.NamespaceAll, + fieldSelector: parseSelectorOrDie(""), }, // pod with "assigned" field selector. { - location: buildLocation(buildResourcePath("", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}})), - lw: ListWatch{ - FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), - Resource: "pods", - }, + location: buildLocation(buildResourcePath("", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}})), + resource: "pods", + namespace: api.NamespaceAll, + fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), }, // pod in namespace "foo" { - location: buildLocation(buildResourcePath("", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}})), - lw: ListWatch{ - FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), - Resource: "pods", - Namespace: "foo", - }, + location: buildLocation(buildResourcePath("", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}})), + resource: "pods", + namespace: "foo", + fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), }, } for _, item := range table { @@ -107,54 +105,52 @@ func TestListWatchesCanList(t *testing.T) { } server := httptest.NewServer(&handler) defer server.Close() - item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) + client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) + lw := NewListWatchFromClient(client, item.resource, item.namespace, item.fieldSelector) // This test merely tests that the correct request is made. - item.lw.List() + lw.List() handler.ValidateRequest(t, item.location, "GET", nil) } } func TestListWatchesCanWatch(t *testing.T) { table := []struct { - rv string - location string - lw ListWatch + rv string + location string + resource string + namespace string + fieldSelector labels.Selector }{ // Minion { - location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{""}})), - rv: "", - lw: ListWatch{ - FieldSelector: parseSelectorOrDie(""), - Resource: "minions", - }, + location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{""}})), + rv: "", + resource: "minions", + namespace: api.NamespaceAll, + fieldSelector: parseSelectorOrDie(""), }, { - location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{"42"}})), - rv: "42", - lw: ListWatch{ - FieldSelector: parseSelectorOrDie(""), - Resource: "minions", - }, + location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "minions"), buildQueryValues(api.NamespaceAll, url.Values{"resourceVersion": []string{"42"}})), + rv: "42", + resource: "minions", + namespace: api.NamespaceAll, + fieldSelector: parseSelectorOrDie(""), }, // pod with "assigned" field selector. { - location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})), - rv: "0", - lw: ListWatch{ - FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), - Resource: "pods", - }, + location: buildLocation(buildResourcePath("watch", api.NamespaceAll, "pods"), buildQueryValues(api.NamespaceAll, url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})), + rv: "0", + resource: "pods", + namespace: api.NamespaceAll, + fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), }, // pod with namespace foo and assigned field selector { - location: buildLocation(buildResourcePath("watch", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})), - rv: "0", - lw: ListWatch{ - FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), - Resource: "pods", - Namespace: "foo", - }, + location: buildLocation(buildResourcePath("watch", "foo", "pods"), buildQueryValues("foo", url.Values{"fields": []string{"DesiredState.Host="}, "resourceVersion": []string{"0"}})), + rv: "0", + resource: "pods", + namespace: "foo", + fieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), }, } @@ -166,10 +162,10 @@ func TestListWatchesCanWatch(t *testing.T) { } server := httptest.NewServer(&handler) defer server.Close() - item.lw.Client = client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) - + client := client.NewOrDie(&client.Config{Host: server.URL, Version: testapi.Version()}) + lw := NewListWatchFromClient(client, item.resource, item.namespace, item.fieldSelector) // This test merely tests that the correct request is made. - item.lw.Watch(item.rv) + lw.Watch(item.rv) handler.ValidateRequest(t, item.location, "GET", nil) } } diff --git a/pkg/kubelet/config/apiserver.go b/pkg/kubelet/config/apiserver.go index 5057ac9518f..cfcac66f008 100644 --- a/pkg/kubelet/config/apiserver.go +++ b/pkg/kubelet/config/apiserver.go @@ -28,11 +28,7 @@ import ( // NewSourceApiserver creates a config source that watches and pulls from the apiserver. func NewSourceApiserver(client *client.Client, hostname string, updates chan<- interface{}) { - lw := &cache.ListWatch{ - Client: client, - FieldSelector: labels.OneTermEqualSelector("Status.Host", hostname), - Resource: "pods", - } + lw := cache.NewListWatchFromClient(client, "pods", api.NamespaceAll, labels.OneTermEqualSelector("Status.Host", hostname)) newSourceApiserverFromLW(lw, updates) } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b490fd1f241..f9c0cc5f918 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -95,7 +95,7 @@ func NewMainKubelet( serviceStore := cache.NewStore(cache.MetaNamespaceKeyFunc) if kubeClient != nil { - cache.NewReflector(&cache.ListWatch{kubeClient, labels.Everything(), "services", api.NamespaceAll}, &api.Service{}, serviceStore).Run() + cache.NewReflector(cache.NewListWatchFromClient(kubeClient, "services", api.NamespaceAll, labels.Everything()), &api.Service{}, serviceStore).Run() } serviceLister := &cache.StoreToServiceLister{serviceStore} diff --git a/plugin/pkg/admission/namespace/autoprovision/admission.go b/plugin/pkg/admission/namespace/autoprovision/admission.go index 42f724b6d98..0cfd7bbe4e0 100644 --- a/plugin/pkg/admission/namespace/autoprovision/admission.go +++ b/plugin/pkg/admission/namespace/autoprovision/admission.go @@ -26,6 +26,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func init() { @@ -79,9 +81,12 @@ func NewProvision(c client.Interface) admission.Interface { store := cache.NewStore(cache.MetaNamespaceKeyFunc) reflector := cache.NewReflector( &cache.ListWatch{ - Client: c.(*client.Client), - FieldSelector: labels.Everything(), - Resource: "namespaces", + ListFunc: func() (runtime.Object, error) { + return c.Namespaces().List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return c.Namespaces().Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, }, &api.Namespace{}, store, diff --git a/plugin/pkg/admission/namespace/exists/admission.go b/plugin/pkg/admission/namespace/exists/admission.go index 453b0860cd3..5a806d984d0 100644 --- a/plugin/pkg/admission/namespace/exists/admission.go +++ b/plugin/pkg/admission/namespace/exists/admission.go @@ -28,6 +28,8 @@ import ( "github.com/GoogleCloudPlatform/kubernetes/pkg/client" "github.com/GoogleCloudPlatform/kubernetes/pkg/client/cache" "github.com/GoogleCloudPlatform/kubernetes/pkg/labels" + "github.com/GoogleCloudPlatform/kubernetes/pkg/runtime" + "github.com/GoogleCloudPlatform/kubernetes/pkg/watch" ) func init() { @@ -83,9 +85,12 @@ func NewExists(c client.Interface) admission.Interface { // TODO: look into a list/watch that can work with client.Interface, maybe pass it a ListFunc and a WatchFunc reflector := cache.NewReflector( &cache.ListWatch{ - Client: c.(*client.Client), - FieldSelector: labels.Everything(), - Resource: "namespaces", + ListFunc: func() (runtime.Object, error) { + return c.Namespaces().List(labels.Everything()) + }, + WatchFunc: func(resourceVersion string) (watch.Interface, error) { + return c.Namespaces().Watch(labels.Everything(), labels.Everything(), resourceVersion) + }, }, &api.Namespace{}, store, diff --git a/plugin/pkg/scheduler/factory/factory.go b/plugin/pkg/scheduler/factory/factory.go index 155dcbb7b10..7dad4479850 100644 --- a/plugin/pkg/scheduler/factory/factory.go +++ b/plugin/pkg/scheduler/factory/factory.go @@ -143,11 +143,7 @@ func (f *ConfigFactory) CreateFromKeys(predicateKeys, priorityKeys util.StringSe // createUnassignedPodLW returns a cache.ListWatch that finds all pods that need to be // scheduled. func (factory *ConfigFactory) createUnassignedPodLW() *cache.ListWatch { - return &cache.ListWatch{ - Client: factory.Client, - FieldSelector: labels.Set{"DesiredState.Host": ""}.AsSelector(), - Resource: "pods", - } + return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, labels.Set{"DesiredState.Host": ""}.AsSelector()) } func parseSelectorOrDie(s string) labels.Selector { @@ -162,20 +158,12 @@ func parseSelectorOrDie(s string) labels.Selector { // already scheduled. // TODO: return a ListerWatcher interface instead? func (factory *ConfigFactory) createAssignedPodLW() *cache.ListWatch { - return &cache.ListWatch{ - Client: factory.Client, - FieldSelector: parseSelectorOrDie("DesiredState.Host!="), - Resource: "pods", - } + return cache.NewListWatchFromClient(factory.Client, "pods", api.NamespaceAll, parseSelectorOrDie("DesiredState.Host!=")) } // createMinionLW returns a cache.ListWatch that gets all changes to minions. func (factory *ConfigFactory) createMinionLW() *cache.ListWatch { - return &cache.ListWatch{ - Client: factory.Client, - FieldSelector: parseSelectorOrDie(""), - Resource: "minions", - } + return cache.NewListWatchFromClient(factory.Client, "minions", api.NamespaceAll, parseSelectorOrDie("")) } // pollMinions lists all minions and filter out unhealthy ones, then returns @@ -215,11 +203,7 @@ func (factory *ConfigFactory) pollMinions() (cache.Enumerator, error) { // createServiceLW returns a cache.ListWatch that gets all changes to services. func (factory *ConfigFactory) createServiceLW() *cache.ListWatch { - return &cache.ListWatch{ - Client: factory.Client, - FieldSelector: parseSelectorOrDie(""), - Resource: "services", - } + return cache.NewListWatchFromClient(factory.Client, "services", api.NamespaceAll, parseSelectorOrDie("")) } func (factory *ConfigFactory) makeDefaultErrorFunc(backoff *podBackoff, podQueue *cache.FIFO) func(pod *api.Pod, err error) {