From ef6529bf2f910290bb3fc1108f0f3f588f27b52d Mon Sep 17 00:00:00 2001 From: Brendan Burns Date: Mon, 31 Oct 2016 22:55:28 -0700 Subject: [PATCH] make groupVersionResource listing dynamic when third party resources are enabled. --- .../app/controllermanager.go | 25 ++++++++++++++++--- .../namespace/namespace_controller.go | 14 +++++------ .../namespace/namespace_controller_test.go | 21 ++++++++++++++-- .../namespace/namespace_controller_utils.go | 6 ++++- .../e2e_node/services/namespace_controller.go | 7 ++---- 5 files changed, 55 insertions(+), 18 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index dacd9cba920..17a87feb6b6 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -369,11 +369,30 @@ func StartControllers(s *options.CMServer, kubeconfig *restclient.Config, rootCl // Find the list of namespaced resources via discovery that the namespace controller must manage namespaceKubeClient := client("namespace-controller") namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), restMapper, dynamic.LegacyAPIPathResolverFunc) - groupVersionResources, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() + // TODO: consider using a list-watch + cache here rather than polling + var gvrFn func() ([]unversioned.GroupVersionResource, error) + rsrcs, err := namespaceKubeClient.Discovery().ServerResources() if err != nil { - glog.Fatalf("Failed to get supported resources from server: %v", err) + glog.Fatalf("Failed to get group version resources: %v", err) } - namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes) + for _, rsrcList := range rsrcs { + for ix := range rsrcList.APIResources { + rsrc := &rsrcList.APIResources[ix] + if rsrc.Kind == "ThirdPartyResource" { + gvrFn = namespaceKubeClient.Discovery().ServerPreferredNamespacedResources + } + } + } + if gvrFn == nil { + gvr, err := namespaceKubeClient.Discovery().ServerPreferredNamespacedResources() + if err != nil { + glog.Fatalf("Failed to get resources: %v", err) + } + gvrFn = func() ([]unversioned.GroupVersionResource, error) { + return gvr, nil + } + } + namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, gvrFn, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes) go namespaceController.Run(int(s.ConcurrentNamespaceSyncs), wait.NeverStop) time.Sleep(wait.Jitter(s.ControllerStartInterval.Duration, ControllerStartJitter)) diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index d5111190535..69065aa7ca0 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -47,8 +47,8 @@ type NamespaceController struct { controller *cache.Controller // namespaces that have been queued up for processing by workers queue workqueue.RateLimitingInterface - // list of preferred group versions and their corresponding resource set for namespace deletion - groupVersionResources []unversioned.GroupVersionResource + // function to list of preferred group versions and their corresponding resource set for namespace deletion + groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error) // opCache is a cache to remember if a particular operation is not supported to aid dynamic client. opCache *operationNotSupportedCache // finalizerToken is the finalizer token managed by this controller @@ -59,7 +59,7 @@ type NamespaceController struct { func NewNamespaceController( kubeClient clientset.Interface, clientPool dynamic.ClientPool, - groupVersionResources []unversioned.GroupVersionResource, + groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error), resyncPeriod time.Duration, finalizerToken api.FinalizerName) *NamespaceController { @@ -86,9 +86,9 @@ func NewNamespaceController( kubeClient: kubeClient, clientPool: clientPool, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"), - groupVersionResources: groupVersionResources, - opCache: opCache, - finalizerToken: finalizerToken, + groupVersionResourcesFn: groupVersionResourcesFn, + opCache: opCache, + finalizerToken: finalizerToken, } if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil { @@ -191,7 +191,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) { return err } namespace := obj.(*api.Namespace) - return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken) + return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResourcesFn, namespace, nm.finalizerToken) } // Run starts observing the system with the specified number of workers. diff --git a/pkg/controller/namespace/namespace_controller_test.go b/pkg/controller/namespace/namespace_controller_test.go index a278fba5c0d..f4d4cb67e1c 100644 --- a/pkg/controller/namespace/namespace_controller_test.go +++ b/pkg/controller/namespace/namespace_controller_test.go @@ -129,6 +129,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV testNamespace *api.Namespace kubeClientActionSet sets.String dynamicClientActionSet sets.String + gvrError error }{ "pending-finalize": { testNamespace: testNamespacePendingFinalize, @@ -148,6 +149,15 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV ), dynamicClientActionSet: sets.NewString(), }, + "groupVersionResourceErr": { + testNamespace: testNamespaceFinalizeComplete, + kubeClientActionSet: sets.NewString( + strings.Join([]string{"get", "namespaces", ""}, "-"), + strings.Join([]string{"delete", "namespaces", ""}, "-"), + ), + dynamicClientActionSet: sets.NewString(), + gvrError: fmt.Errorf("test error"), + }, } for scenario, testInput := range scenarios { @@ -158,7 +168,11 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV mockClient := fake.NewSimpleClientset(testInput.testNamespace) clientPool := dynamic.NewClientPool(clientConfig, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, groupVersionResources, testInput.testNamespace, api.FinalizerKubernetes) + fn := func() ([]unversioned.GroupVersionResource, error) { + return groupVersionResources, nil + } + + err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testInput.testNamespace, api.FinalizerKubernetes) if err != nil { t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err) } @@ -227,7 +241,10 @@ func TestSyncNamespaceThatIsActive(t *testing.T) { Phase: api.NamespaceActive, }, } - err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, testGroupVersionResources(), testNamespace, api.FinalizerKubernetes) + fn := func() ([]unversioned.GroupVersionResource, error) { + return testGroupVersionResources(), nil + } + err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, api.FinalizerKubernetes) if err != nil { t.Errorf("Unexpected error when synching namespace %v", err) } diff --git a/pkg/controller/namespace/namespace_controller_utils.go b/pkg/controller/namespace/namespace_controller_utils.go index f6bffd58bde..04db7841eb9 100644 --- a/pkg/controller/namespace/namespace_controller_utils.go +++ b/pkg/controller/namespace/namespace_controller_utils.go @@ -371,7 +371,7 @@ func syncNamespace( kubeClient clientset.Interface, clientPool dynamic.ClientPool, opCache *operationNotSupportedCache, - groupVersionResources []unversioned.GroupVersionResource, + groupVersionResourcesFn func() ([]unversioned.GroupVersionResource, error), namespace *api.Namespace, finalizerToken api.FinalizerName, ) error { @@ -422,6 +422,10 @@ func syncNamespace( } // there may still be content for us to remove + groupVersionResources, err := groupVersionResourcesFn() + if err != nil { + return err + } estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp) if err != nil { return err diff --git a/test/e2e_node/services/namespace_controller.go b/test/e2e_node/services/namespace_controller.go index e4282edba93..b16e67fd7ec 100644 --- a/test/e2e_node/services/namespace_controller.go +++ b/test/e2e_node/services/namespace_controller.go @@ -56,11 +56,8 @@ func (n *NamespaceController) Start() error { return err } clientPool := dynamic.NewClientPool(config, registered.RESTMapper(), dynamic.LegacyAPIPathResolverFunc) - resources, err := client.Discovery().ServerPreferredNamespacedResources() - if err != nil { - return err - } - nc := namespacecontroller.NewNamespaceController(client, clientPool, resources, ncResyncPeriod, api.FinalizerKubernetes) + gvrFn := client.Discovery().ServerPreferredNamespacedResources + nc := namespacecontroller.NewNamespaceController(client, clientPool, gvrFn, ncResyncPeriod, api.FinalizerKubernetes) go nc.Run(ncConcurrency, n.stopCh) return nil }