diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 6c61e03aad9..4abf987117e 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -45,7 +45,8 @@ import ( func startEndpointController(ctx ControllerContext) (bool, error) { go endpointcontroller.NewEndpointController( - ctx.InformerFactory.Pods().Informer(), + ctx.NewInformerFactory.Core().V1().Pods(), + ctx.NewInformerFactory.Core().V1().Services(), ctx.ClientBuilder.ClientOrDie("endpoint-controller"), ).Run(int(ctx.Options.ConcurrentEndpointSyncs), ctx.Stop) return true, nil diff --git a/pkg/controller/endpoint/BUILD b/pkg/controller/endpoint/BUILD index 8464c48546a..3562df38db7 100644 --- a/pkg/controller/endpoint/BUILD +++ b/pkg/controller/endpoint/BUILD @@ -20,19 +20,16 @@ go_library( "//pkg/api/v1/endpoints:go_default_library", "//pkg/api/v1/pod:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", - "//pkg/client/legacylisters:go_default_library", - "//pkg/controller:go_default_library", - "//pkg/controller/informers:go_default_library", + "//pkg/client/informers/informers_generated/core/v1:go_default_library", + "//pkg/client/listers/core/v1:go_default_library", "//pkg/util/metrics:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/errors", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/labels", - "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/util/runtime", "//vendor:k8s.io/apimachinery/pkg/util/sets", "//vendor:k8s.io/apimachinery/pkg/util/wait", - "//vendor:k8s.io/apimachinery/pkg/watch", "//vendor:k8s.io/client-go/tools/cache", "//vendor:k8s.io/client-go/util/workqueue", ], @@ -49,6 +46,7 @@ go_test( "//pkg/api/v1:go_default_library", "//pkg/api/v1/endpoints:go_default_library", "//pkg/client/clientset_generated/clientset:go_default_library", + "//pkg/client/informers/informers_generated:go_default_library", "//pkg/controller:go_default_library", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", "//vendor:k8s.io/apimachinery/pkg/runtime", diff --git a/pkg/controller/endpoint/endpoints_controller.go b/pkg/controller/endpoint/endpoints_controller.go index be350f25e9d..befecc5e17c 100644 --- a/pkg/controller/endpoint/endpoints_controller.go +++ b/pkg/controller/endpoint/endpoints_controller.go @@ -25,11 +25,9 @@ import ( "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" - "k8s.io/apimachinery/pkg/runtime" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api/v1" @@ -37,9 +35,8 @@ import ( podutil "k8s.io/kubernetes/pkg/api/v1/pod" utilpod "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" - "k8s.io/kubernetes/pkg/client/legacylisters" - "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/controller/informers" + coreinformers "k8s.io/kubernetes/pkg/client/informers/informers_generated/core/v1" + corelisters "k8s.io/kubernetes/pkg/client/listers/core/v1" "k8s.io/kubernetes/pkg/util/metrics" "github.com/golang/glog" @@ -51,10 +48,6 @@ const ( // shorter amount of time before a mistaken endpoint is corrected. FullServiceResyncPeriod = 30 * time.Second - // We must avoid syncing service until the pod store has synced. If it hasn't synced, to - // avoid a hot loop, we'll wait this long between checks. - PodStoreSyncedPollPeriod = 100 * time.Millisecond - // An annotation on the Service denoting if the endpoints controller should // go ahead and create endpoints for unready pods. This annotation is // currently only used by StatefulSets, where we need the pod to be DNS @@ -73,7 +66,7 @@ var ( ) // NewEndpointController returns a new *EndpointController. -func NewEndpointController(podInformer cache.SharedIndexInformer, client clientset.Interface) *EndpointController { +func NewEndpointController(podInformer coreinformers.PodInformer, serviceInformer coreinformers.ServiceInformer, client clientset.Interface) *EndpointController { if client != nil && client.Core().RESTClient().GetRateLimiter() != nil { metrics.RegisterMetricAndTrackRateLimiterUsage("endpoint_controller", client.Core().RESTClient().GetRateLimiter()) } @@ -82,18 +75,7 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client clients queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint"), } - e.serviceStore.Indexer, e.serviceController = cache.NewIndexerInformer( - &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - return e.client.Core().Services(metav1.NamespaceAll).List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - return e.client.Core().Services(metav1.NamespaceAll).Watch(options) - }, - }, - &v1.Service{}, - // TODO: Can we have much longer period here? - FullServiceResyncPeriod, + serviceInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: e.enqueueService, UpdateFunc: func(old, cur interface{}) { @@ -101,26 +83,19 @@ func NewEndpointController(podInformer cache.SharedIndexInformer, client clients }, DeleteFunc: e.enqueueService, }, - cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}, + // TODO: Can we have much longer period here? + FullServiceResyncPeriod, ) + e.serviceLister = serviceInformer.Lister() + e.servicesSynced = serviceInformer.Informer().HasSynced - podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{ + podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: e.addPod, UpdateFunc: e.updatePod, DeleteFunc: e.deletePod, }) - e.podStore.Indexer = podInformer.GetIndexer() - e.podController = podInformer.GetController() - e.podStoreSynced = podInformer.HasSynced - - return e -} - -// NewEndpointControllerFromClient returns a new *EndpointController that runs its own informer. -func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod controller.ResyncPeriodFunc) *EndpointController { - podInformer := informers.NewPodInformer(client, resyncPeriod()) - e := NewEndpointController(podInformer, client) - e.internalPodInformer = podInformer + e.podLister = podInformer.Lister() + e.podsSynced = podInformer.Informer().HasSynced return e } @@ -129,15 +104,19 @@ func NewEndpointControllerFromClient(client *clientset.Clientset, resyncPeriod c type EndpointController struct { client clientset.Interface - serviceStore listers.StoreToServiceLister - podStore listers.StoreToPodLister + // serviceLister is able to list/get services and is populated by the shared informer passed to + // NewEndpointController. + serviceLister corelisters.ServiceLister + // servicesSynced returns true if the service shared informer has been synced at least once. + // Added as a member to the struct to allow injection for testing. + servicesSynced cache.InformerSynced - // internalPodInformer is used to hold a personal informer. If we're using - // a normal shared informer, then the informer will be started for us. If - // we have a personal informer, we must start it ourselves. If you start - // the controller using NewEndpointController(passing SharedInformer), this - // will be null - internalPodInformer cache.SharedIndexInformer + // podLister is able to list/get pods and is populated by the shared informer passed to + // NewEndpointController. + podLister corelisters.PodLister + // podsSynced returns true if the pod shared informer has been synced at least once. + // Added as a member to the struct to allow injection for testing. + podsSynced cache.InformerSynced // Services that need to be updated. A channel is inappropriate here, // because it allows services with lots of pods to be serviced much @@ -145,14 +124,6 @@ type EndpointController struct { // service that's inserted multiple times to be processed more than // necessary. queue workqueue.RateLimitingInterface - - // Since we join two objects, we'll watch both of them with - // controllers. - serviceController cache.Controller - podController cache.Controller - // podStoreSynced returns true if the pod store has been synced at least once. - // Added as a member to the struct to allow injection for testing. - podStoreSynced func() bool } // Runs e; will not return until stopCh is closed. workers determines how many @@ -161,10 +132,8 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer e.queue.ShutDown() - go e.serviceController.Run(stopCh) - go e.podController.Run(stopCh) - - if !cache.WaitForCacheSync(stopCh, e.podStoreSynced) { + if !cache.WaitForCacheSync(stopCh, e.podsSynced, e.servicesSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) return } @@ -177,16 +146,12 @@ func (e *EndpointController) Run(workers int, stopCh <-chan struct{}) { e.checkLeftoverEndpoints() }() - if e.internalPodInformer != nil { - go e.internalPodInformer.Run(stopCh) - } - <-stopCh } func (e *EndpointController) getPodServiceMemberships(pod *v1.Pod) (sets.String, error) { set := sets.String{} - services, err := e.serviceStore.GetPodServices(pod) + services, err := e.serviceLister.GetPodServices(pod) if err != nil { // don't log this error because this function makes pointless // errors when no services match. @@ -338,8 +303,12 @@ func (e *EndpointController) syncService(key string) error { glog.V(4).Infof("Finished syncing service %q endpoints. (%v)", key, time.Now().Sub(startTime)) }() - obj, exists, err := e.serviceStore.Indexer.GetByKey(key) - if err != nil || !exists { + namespace, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + service, err := e.serviceLister.Services(namespace).Get(name) + if err != nil { // Delete the corresponding endpoint, as the service has been deleted. // TODO: Please note that this will delete an endpoint when a // service is deleted. However, if we're down at the time when @@ -358,7 +327,6 @@ func (e *EndpointController) syncService(key string) error { return nil } - service := obj.(*v1.Service) if service.Spec.Selector == nil { // services without a selector receive no endpoints from this controller; // these services will receive the endpoints that are created out-of-band via the REST API. @@ -366,7 +334,7 @@ func (e *EndpointController) syncService(key string) error { } glog.V(5).Infof("About to update endpoints for service %q", key) - pods, err := e.podStore.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) + pods, err := e.podLister.Pods(service.Namespace).List(labels.Set(service.Spec.Selector).AsSelectorPreValidated()) if err != nil { // Since we're getting stuff from a local cache, it is // basically impossible to get this error. diff --git a/pkg/controller/endpoint/endpoints_controller_test.go b/pkg/controller/endpoint/endpoints_controller_test.go index 5ebb90725de..613b519c61b 100644 --- a/pkg/controller/endpoint/endpoints_controller_test.go +++ b/pkg/controller/endpoint/endpoints_controller_test.go @@ -33,6 +33,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" endptspkg "k8s.io/kubernetes/pkg/api/v1/endpoints" "k8s.io/kubernetes/pkg/client/clientset_generated/clientset" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated" "k8s.io/kubernetes/pkg/controller" ) @@ -92,6 +93,25 @@ func makeTestServer(t *testing.T, namespace string, endpointsResponse serverResp return httptest.NewServer(mux), &fakeEndpointsHandler } +type endpointController struct { + *EndpointController + podStore cache.Store + serviceStore cache.Store +} + +func newController(url string) *endpointController { + client := clientset.NewForConfigOrDie(&restclient.Config{Host: url, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) + informerFactory := informers.NewSharedInformerFactory(nil, client, controller.NoResyncPeriodFunc()) + endpoints := NewEndpointController(informerFactory.Core().V1().Pods(), informerFactory.Core().V1().Services(), client) + endpoints.podsSynced = alwaysReady + endpoints.servicesSynced = alwaysReady + return &endpointController{ + endpoints, + informerFactory.Core().V1().Pods().Informer().GetStore(), + informerFactory.Core().V1().Services().Informer().GetStore(), + } +} + func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { ns := metav1.NamespaceDefault testServer, endpointsHandler := makeTestServer(t, ns, @@ -107,10 +127,8 @@ func TestSyncEndpointsItemsPreserveNoSelector(t *testing.T) { }}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{Ports: []v1.ServicePort{{Port: 80}}}, }) @@ -140,9 +158,7 @@ func TestCheckLeftoverEndpoints(t *testing.T) { }}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady + endpoints := newController(testServer.URL) endpoints.checkLeftoverEndpoints() if e, a := 1, endpoints.queue.Len(); e != a { @@ -169,12 +185,10 @@ func TestSyncEndpointsProtocolTCP(t *testing.T) { }}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady + endpoints := newController(testServer.URL) - addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Indexer.Add(&v1.Service{ + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, @@ -212,11 +226,9 @@ func TestSyncEndpointsProtocolUDP(t *testing.T) { }}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, @@ -251,11 +263,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAll(t *testing.T) { Subsets: []v1.EndpointSubset{}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, @@ -289,11 +299,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllNotReady(t *testing.T) { Subsets: []v1.EndpointSubset{}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 0, 1, 1) - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 0, 1, 1) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, @@ -327,11 +335,9 @@ func TestSyncEndpointsItemsEmptySelectorSelectsAllMixed(t *testing.T) { Subsets: []v1.EndpointSubset{}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 1, 1, 1) - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 1) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{}, @@ -369,11 +375,9 @@ func TestSyncEndpointsItemsPreexisting(t *testing.T) { }}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, @@ -410,11 +414,9 @@ func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) { }}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, metav1.NamespaceDefault, 1, 1, 0) - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, metav1.NamespaceDefault, 1, 1, 0) + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: metav1.NamespaceDefault}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, @@ -430,12 +432,10 @@ func TestSyncEndpointsItems(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &v1.Endpoints{}}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) - addPods(endpoints.podStore.Indexer, "blah", 5, 2, 0) // make sure these aren't found! - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 3, 2, 0) + addPods(endpoints.podStore, "blah", 5, 2, 0) // make sure these aren't found! + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{Name: "foo", Namespace: ns}, Spec: v1.ServiceSpec{ Selector: map[string]string{"foo": "bar"}, @@ -473,12 +473,10 @@ func TestSyncEndpointsItemsWithLabels(t *testing.T) { testServer, endpointsHandler := makeTestServer(t, ns, serverResponse{http.StatusOK, &v1.Endpoints{}}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 3, 2, 0) + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 3, 2, 0) serviceLabels := map[string]string{"foo": "bar"} - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, @@ -534,12 +532,10 @@ func TestSyncEndpointsItemsPreexistingLabelsChange(t *testing.T) { }}, }}) defer testServer.Close() - client := clientset.NewForConfigOrDie(&restclient.Config{Host: testServer.URL, ContentConfig: restclient.ContentConfig{GroupVersion: &api.Registry.GroupOrDie(v1.GroupName).GroupVersion}}) - endpoints := NewEndpointControllerFromClient(client, controller.NoResyncPeriodFunc) - endpoints.podStoreSynced = alwaysReady - addPods(endpoints.podStore.Indexer, ns, 1, 1, 0) + endpoints := newController(testServer.URL) + addPods(endpoints.podStore, ns, 1, 1, 0) serviceLabels := map[string]string{"baz": "blah"} - endpoints.serviceStore.Indexer.Add(&v1.Service{ + endpoints.serviceStore.Add(&v1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: "foo", Namespace: ns, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go index f6ab9cca52b..dffc97d9710 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/policy.go @@ -280,7 +280,7 @@ func ClusterRoles() []rbac.ClusterRole { rbac.NewRule("update").Groups(legacyGroup).Resources("endpoints", "serviceaccounts").RuleOrDie(), rbac.NewRule("list", "watch").Groups("*").Resources("namespaces", "nodes", "persistentvolumeclaims", - "persistentvolumes", "pods", "secrets", "serviceaccounts", "replicationcontrollers").RuleOrDie(), + "persistentvolumes", "pods", "secrets", "services", "serviceaccounts", "replicationcontrollers").RuleOrDie(), rbac.NewRule("list", "watch").Groups(extensionsGroup).Resources("daemonsets", "deployments", "replicasets").RuleOrDie(), rbac.NewRule("list", "watch").Groups(batchGroup).Resources("jobs", "cronjobs").RuleOrDie(), }, diff --git a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml index 81a60a4f9fc..f1695e20bb9 100644 --- a/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml +++ b/plugin/pkg/auth/authorizer/rbac/bootstrappolicy/testdata/cluster-roles.yaml @@ -454,6 +454,7 @@ items: - replicationcontrollers - secrets - serviceaccounts + - services verbs: - list - watch