diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index a4c1539976e..eb8c722dd88 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -36,6 +36,8 @@ import ( "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" "k8s.io/kubernetes/pkg/apis/batch" + "k8s.io/kubernetes/pkg/apis/extensions" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" v1core "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/typed/core/v1" "k8s.io/kubernetes/pkg/client/leaderelection" @@ -393,44 +395,26 @@ func StartControllers(s *options.CMServer, rootClientBuilder, clientBuilder cont namespaceKubeClient := clientBuilder.ClientOrDie("namespace-controller") namespaceClientPool := dynamic.NewClientPool(rootClientBuilder.ConfigOrDie("namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) // TODO: consider using a list-watch + cache here rather than polling - gvrFn := func() (map[schema.GroupVersionResource]struct{}, error) { - resources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() - if err != nil { - // best effort extraction - gvrs, _ := discovery.GroupVersionResources(resources) - return gvrs, fmt.Errorf("failed to get supported namespaced resources: %v", err) - } - gvrs, err := discovery.GroupVersionResources(resources) - if err != nil { - return gvrs, fmt.Errorf("failed to parse supported namespaced resources: %v", err) - } - return gvrs, nil - } - rsrcs, err := namespaceKubeClient.Discovery().ServerResources() + resources, err := namespaceKubeClient.Discovery().ServerResources() if err != nil { - return fmt.Errorf("failed to get group version resources: %v", err) + return fmt.Errorf("failed to get preferred server resources: %v", err) } - tprFound := false -searchThirdPartyResource: - for _, rsrcList := range rsrcs { - for ix := range rsrcList.APIResources { - rsrc := &rsrcList.APIResources[ix] - if rsrc.Kind == "ThirdPartyResource" { - tprFound = true - break searchThirdPartyResource - } - } + gvrs, err := discovery.GroupVersionResources(resources) + if err != nil { + return fmt.Errorf("failed to parse preferred server resources: %v", err) } - if !tprFound { - gvr, err := gvrFn() + discoverResourcesFn := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources + if _, found := gvrs[extensions.SchemeGroupVersion.WithResource("thirdpartyresource")]; found { + // make discovery static + snapshot, err := discoverResourcesFn() if err != nil { - return fmt.Errorf("failed to get resources: %v", err) + return fmt.Errorf("failed to get server resources: %v", err) } - gvrFn = func() (map[schema.GroupVersionResource]struct{}, error) { - return gvr, nil + discoverResourcesFn = func() ([]*metav1.APIResourceList, error) { + return snapshot, nil } } - namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) + namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, discoverResourcesFn, s.NamespaceSyncPeriod.Duration, v1.FinalizerKubernetes) go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), stop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) @@ -567,9 +551,10 @@ searchThirdPartyResource: if err != nil { return fmt.Errorf("failed to get supported resources from server: %v", err) } - groupVersionResources, err := discovery.GroupVersionResources(preferredResources) + deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources) + deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources) if err != nil { - glog.Fatalf("Failed to parse supported resources from server: %v", err) + glog.Errorf("Failed to parse resources from server: %v", err) } config := rootClientBuilder.ConfigOrDie("generic-garbage-collector") @@ -577,7 +562,7 @@ searchThirdPartyResource: metaOnlyClientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) config.ContentConfig = dynamic.ContentConfig() clientPool := dynamic.NewClientPool(config, restMapper, dynamic.LegacyAPIPathResolverFunc) - garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, groupVersionResources) + garbageCollector, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, restMapper, deletableGroupVersionResources) if err != nil { glog.Errorf("Failed to start the generic garbage collector: %v", err) } else { diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index bc400d07e18..e94367fad7f 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -537,7 +537,7 @@ var ignoredResources = map[schema.GroupVersionResource]struct{}{ schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {}, } -func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, resources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) { +func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) { gc := &GarbageCollector{ metaOnlyClientPool: metaOnlyClientPool, clientPool: clientPool, @@ -545,8 +545,8 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam clock: clock.RealClock{}, dirtyQueue: workqueue.NewTimedWorkQueue(), orphanQueue: workqueue.NewTimedWorkQueue(), - registeredRateLimiter: NewRegisteredRateLimiter(resources), - registeredRateLimiterForMonitors: NewRegisteredRateLimiter(resources), + registeredRateLimiter: NewRegisteredRateLimiter(deletableResources), + registeredRateLimiterForMonitors: NewRegisteredRateLimiter(deletableResources), absentOwnerCache: NewUIDCache(500), } gc.propagator = &Propagator{ @@ -557,7 +557,7 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam }, gc: gc, } - for resource := range resources { + for resource := range deletableResources { if _, ok := ignoredResources[resource]; ok { glog.V(6).Infof("ignore resource %#v", resource) continue diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index 0dba2b1daa5..2efbebfc81f 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -49,7 +49,7 @@ func TestNewGarbageCollector(t *testing.T) { metaOnlyClientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - podResource := []schema.GroupVersionResource{{Version: "v1", Resource: "pods"}} + podResource := map[schema.GroupVersionResource]struct{}{schema.GroupVersionResource{Version: "v1", Resource: "pods"}: {}} gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), podResource) if err != nil { t.Fatal(err) @@ -113,7 +113,7 @@ func setupGC(t *testing.T, config *restclient.Config) *GarbageCollector { metaOnlyClientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - podResource := []schema.GroupVersionResource{{Version: "v1", Resource: "pods"}} + podResource := map[schema.GroupVersionResource]struct{}{schema.GroupVersionResource{Version: "v1", Resource: "pods"}: {}} gc, err := NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), podResource) if err != nil { t.Fatal(err) diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 741e3bbe8b9..c80f9b3b808 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -20,6 +20,7 @@ import ( "time" "k8s.io/kubernetes/pkg/api/v1" + metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/client/typed/dynamic" @@ -28,6 +29,7 @@ import ( "k8s.io/kubernetes/pkg/runtime/schema" "k8s.io/kubernetes/pkg/util/metrics" utilruntime "k8s.io/kubernetes/pkg/util/runtime" + "k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" @@ -57,8 +59,8 @@ type NamespaceController struct { controller *cache.Controller // namespaces that have been queued up for processing by workers queue workqueue.RateLimitingInterface - // function to list of preferred group versions and their corresponding resource set for namespace deletion - groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error) + // function to list of preferred resources for namespace deletion + discoverResourcesFn func() ([]*metav1.APIResourceList, error) // opCache is a cache to remember if a particular operation is not supported to aid dynamic client. opCache *operationNotSupportedCache // finalizerToken is the finalizer token managed by this controller @@ -69,36 +71,55 @@ type NamespaceController struct { func NewNamespaceController( kubeClient clientset.Interface, clientPool dynamic.ClientPool, - groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error), + discoverResourcesFn func() ([]*metav1.APIResourceList, error), resyncPeriod time.Duration, finalizerToken v1.FinalizerName) *NamespaceController { - // the namespace deletion code looks at the discovery document to enumerate the set of resources on the server. - // it then finds all namespaced resources, and in response to namespace deletion, will call delete on all of them. - // unfortunately, the discovery information does not include the list of supported verbs/methods. if the namespace - // controller calls LIST/DELETECOLLECTION for a resource, it will get a 405 error from the server and cache that that was the case. - // we found in practice though that some auth engines when encountering paths they don't know about may return a 50x. - // until we have verbs, we pre-populate resources that do not support list or delete for well-known apis rather than - // probing the server once in order to be told no. opCache := &operationNotSupportedCache{ m: make(map[operationKey]bool), } - ignoredGroupVersionResources := []schema.GroupVersionResource{ - {Group: "", Version: "v1", Resource: "bindings"}, + + // pre-fill opCache with the discovery info + // + // TODO(sttts): get rid of opCache and http 405 logic around it and trust discovery info + resources, err := discoverResourcesFn() + if err != nil { + glog.Fatalf("Failed to get supported resources: %v", err) } - for _, ignoredGroupVersionResource := range ignoredGroupVersionResources { - opCache.setNotSupported(operationKey{op: operationDeleteCollection, gvr: ignoredGroupVersionResource}) - opCache.setNotSupported(operationKey{op: operationList, gvr: ignoredGroupVersionResource}) + deletableGroupVersionResources := []schema.GroupVersionResource{} + for _, rl := range resources { + gv, err := schema.ParseGroupVersion(rl.GroupVersion) + if err != nil { + glog.Errorf("Failed to parse GroupVersion %q, skipping: %v", rl.GroupVersion, err) + continue + } + + for _, r := range rl.APIResources { + gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: r.Name} + verbs := sets.NewString([]string(r.Verbs)...) + + if !verbs.Has("delete") { + glog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr) + } + + for _, op := range []operation{operationList, operationDeleteCollection} { + if !verbs.Has(string(op)) { + opCache.setNotSupported(operationKey{op: op, gvr: gvr}) + } + } + + deletableGroupVersionResources = append(deletableGroupVersionResources, gvr) + } } // create the controller so we can inject the enqueue function namespaceController := &NamespaceController{ - kubeClient: kubeClient, - clientPool: clientPool, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"), - groupVersionResourcesFn: groupVersionResourcesFn, - opCache: opCache, - finalizerToken: finalizerToken, + kubeClient: kubeClient, + clientPool: clientPool, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"), + discoverResourcesFn: discoverResourcesFn, + opCache: opCache, + finalizerToken: finalizerToken, } if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { @@ -203,7 +224,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) { return err } namespace := obj.(*v1.Namespace) - return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken) + return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.discoverResourcesFn, namespace, nm.finalizerToken) } // Run starts observing the system with the specified number of workers. diff --git a/pkg/controller/namespace/namespace_controller_test.go b/pkg/controller/namespace/namespace_controller_test.go index 3d06f8f5cb1..faa22f05973 100644 --- a/pkg/controller/namespace/namespace_controller_test.go +++ b/pkg/controller/namespace/namespace_controller_test.go @@ -34,6 +34,7 @@ import ( "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5/fake" "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/schema" @@ -113,8 +114,9 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio // when doing a delete all of content, we will do a GET of a collection, and DELETE of a collection by default dynamicClientActionSet := sets.NewString() - groupVersionResources := testGroupVersionResources() - for _, groupVersionResource := range groupVersionResources { + resources := testResources() + groupVersionResources, _ := discovery.GroupVersionResources(resources) + for groupVersionResource := range groupVersionResources { urlPath := path.Join([]string{ dynamic.LegacyAPIPathResolverFunc(schema.GroupVersionKind{Group: groupVersionResource.Group, Version: groupVersionResource.Version}), groupVersionResource.Group, @@ -170,8 +172,8 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio mockClient := fake.NewSimpleClientset(testInput.testNamespace) clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - fn := func() ([]schema.GroupVersionResource, error) { - return groupVersionResources, nil + fn := func() ([]*metav1.APIResourceList, error) { + return resources, nil } err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testInput.testNamespace, v1.FinalizerKubernetes) @@ -243,8 +245,8 @@ func TestSyncNamespaceThatIsActive(t *testing.T) { Phase: v1.NamespaceActive, }, } - fn := func() ([]schema.GroupVersionResource, error) { - return testGroupVersionResources(), nil + fn := func() ([]*metav1.APIResourceList, error) { + return testResources(), nil } err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, v1.FinalizerKubernetes) if err != nil { @@ -295,11 +297,37 @@ func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *htt response.Write([]byte("{\"kind\": \"List\",\"items\":null}")) } -// testGroupVersionResources returns a mocked up set of resources across different api groups for testing namespace controller. -func testGroupVersionResources() []schema.GroupVersionResource { - results := []schema.GroupVersionResource{} - results = append(results, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}) - results = append(results, schema.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}) - results = append(results, schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}) +// testResources returns a mocked up set of resources across different api groups for testing namespace controller. +func testResources() []*metav1.APIResourceList { + results := []*metav1.APIResourceList{ + { + GroupVersion: "v1", + APIResources: []metav1.APIResource{ + { + Name: "pods", + Namespaced: true, + Kind: "Pod", + Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"}, + }, + { + Name: "services", + Namespaced: true, + Kind: "Service", + Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"}, + }, + }, + }, + { + GroupVersion: "extensions/v1beta1", + APIResources: []metav1.APIResource{ + { + Name: "deployments", + Namespaced: true, + Kind: "Deployment", + Verbs: []string{"get", "list", "delete", "deletecollection", "create", "update"}, + }, + }, + }, + } return results } diff --git a/pkg/controller/namespace/namespace_controller_utils.go b/pkg/controller/namespace/namespace_controller_utils.go index 6cedcfc5b18..779e90c054d 100644 --- a/pkg/controller/namespace/namespace_controller_utils.go +++ b/pkg/controller/namespace/namespace_controller_utils.go @@ -25,6 +25,7 @@ import ( "k8s.io/kubernetes/pkg/api/v1" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" + "k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime/schema" @@ -367,7 +368,7 @@ func syncNamespace( kubeClient clientset.Interface, clientPool dynamic.ClientPool, opCache *operationNotSupportedCache, - groupVersionResourcesFn func() (map[schema.GroupVersionResource]struct{}, error), + discoverResourcesFn func() ([]*metav1.APIResourceList, error), namespace *v1.Namespace, finalizerToken v1.FinalizerName, ) error { @@ -418,7 +419,13 @@ func syncNamespace( } // there may still be content for us to remove - groupVersionResources, err := groupVersionResourcesFn() + resources, err := discoverResourcesFn() + if err != nil { + return err + } + // TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter + deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources) + groupVersionResources, err := discovery.GroupVersionResources(deletableResources) if err != nil { return err } diff --git a/test/e2e_node/services/namespace_controller.go b/test/e2e_node/services/namespace_controller.go index 320c4d064be..6420817b420 100644 --- a/test/e2e_node/services/namespace_controller.go +++ b/test/e2e_node/services/namespace_controller.go @@ -56,8 +56,8 @@ func (n *NamespaceController) Start() error { return err } clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - gvrFn := client.Discovery().ServerPreferredNamespacedResources - nc := namespacecontroller.NewNamespaceController(client, clientPool, gvrFn, ncResyncPeriod, v1.FinalizerKubernetes) + discoverResourcesFn := client.Discovery().ServerPreferredNamespacedResources + nc := namespacecontroller.NewNamespaceController(client, clientPool, discoverResourcesFn, ncResyncPeriod, v1.FinalizerKubernetes) go nc.Run(ncConcurrency, n.stopCh) return nil } diff --git a/test/integration/garbagecollector/garbage_collector_test.go b/test/integration/garbagecollector/garbage_collector_test.go index ef6348278e9..1061094bd06 100644 --- a/test/integration/garbagecollector/garbage_collector_test.go +++ b/test/integration/garbagecollector/garbage_collector_test.go @@ -29,12 +29,14 @@ import ( "github.com/golang/glog" dto "github.com/prometheus/client_model/go" + "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" "k8s.io/kubernetes/pkg/apimachinery/registered" metav1 "k8s.io/kubernetes/pkg/apis/meta/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/release_1_5" "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/discovery" "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/controller/garbagecollector" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" @@ -128,16 +130,21 @@ func setup(t *testing.T) (*httptest.Server, *garbagecollector.GarbageCollector, if err != nil { t.Fatalf("Error in create clientset: %v", err) } - groupVersionResources, err := clientSet.Discovery().ServerPreferredResources() + preferredResources, err := clientSet.Discovery().ServerPreferredResources() if err != nil { t.Fatalf("Failed to get supported resources from server: %v", err) } + deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, preferredResources) + deletableGroupVersionResources, err := discovery.GroupVersionResources(deletableResources) + if err != nil { + t.Fatalf("Failed to parse supported resources from server: %v", err) + } config := &restclient.Config{Host: s.URL} config.ContentConfig.NegotiatedSerializer = serializer.DirectCodecFactory{CodecFactory: metaonly.NewMetadataCodecFactory()} metaOnlyClientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) config.ContentConfig.NegotiatedSerializer = nil clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), groupVersionResources) + gc, err := garbagecollector.NewGarbageCollector(metaOnlyClientPool, clientPool, registered.RESTMapper(), deletableGroupVersionResources) if err != nil { t.Fatalf("Failed to create garbage collector") }