From 41057b02d5f69e6bb4804884f983a2b35e38c688 Mon Sep 17 00:00:00 2001 From: derekwaynecarr Date: Wed, 2 Mar 2016 23:34:18 -0500 Subject: [PATCH] Move namespace controller to use discovery and dynamic client --- .../app/controllermanager.go | 10 +- .../controllermanager/controllermanager.go | 10 +- .../namespace/namespace_controller.go | 25 +- .../namespace/namespace_controller_test.go | 158 ++++-- .../namespace/namespace_controller_utils.go | 472 +++++++++++------- 5 files changed, 426 insertions(+), 249 deletions(-) diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 9dfc91cb4c6..df7dcfe9038 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -40,6 +40,7 @@ import ( "k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/dynamic" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/cloudprovider" @@ -266,7 +267,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Fatalf("Failed to get supported resources from server: %v", err) } - namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod.Duration) + // Find the list of namespaced resources via discovery that the namespace controller must manage + namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")) + namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc) + groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery()) + if err != nil { + glog.Fatalf("Failed to get supported resources from server: %v", err) + } + namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes) go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) groupVersion := "extensions/v1beta1" diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index e383e792250..dd8f66464d3 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -32,6 +32,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/restclient" + "k8s.io/kubernetes/pkg/client/typed/dynamic" client "k8s.io/kubernetes/pkg/client/unversioned" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" @@ -214,7 +215,14 @@ func (s *CMServer) Run(_ []string) error { glog.Fatalf("Failed to get supported resources from server: %v", err) } - namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod.Duration) + // Find the list of namespaced resources via discovery that the namespace controller must manage + namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")) + namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc) + groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery()) + if err != nil { + glog.Fatalf("Failed to get supported resources from server: %v", err) + } + namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes) go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) groupVersion := "extensions/v1beta1" diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 8d00bcc84d3..a1c6d4aacac 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -23,6 +23,7 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" @@ -38,23 +39,37 @@ import ( type NamespaceController struct { // client that purges namespace content, must have list/delete privileges on all content kubeClient clientset.Interface + // clientPool manages a pool of dynamic clients + clientPool dynamic.ClientPool // store that holds the namespaces store cache.Store // controller that observes the namespaces controller *framework.Controller // namespaces that have been queued up for processing by workers queue *workqueue.Type - // list of versions to process - versions *unversioned.APIVersions + // list of preferred group versions and their corresponding resource set for namespace deletion + groupVersionResources []unversioned.GroupVersionResource + // opCache is a cache to remember if a particular operation is not supported to aid dynamic client. + opCache operationNotSupportedCache + // finalizerToken is the finalizer token managed by this controller + finalizerToken api.FinalizerName } // NewNamespaceController creates a new NamespaceController -func NewNamespaceController(kubeClient clientset.Interface, versions *unversioned.APIVersions, resyncPeriod time.Duration) *NamespaceController { +func NewNamespaceController( + kubeClient clientset.Interface, + clientPool dynamic.ClientPool, + groupVersionResources []unversioned.GroupVersionResource, + resyncPeriod time.Duration, + finalizerToken api.FinalizerName) *NamespaceController { // create the controller so we can inject the enqueue function namespaceController := &NamespaceController{ kubeClient: kubeClient, - versions: versions, + clientPool: clientPool, queue: workqueue.New(), + groupVersionResources: groupVersionResources, + opCache: operationNotSupportedCache{}, + finalizerToken: finalizerToken, } // configure the backing store/controller @@ -144,7 +159,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) { return err } namespace := obj.(*api.Namespace) - return syncNamespace(nm.kubeClient, nm.versions, namespace) + return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken) } // Run starts observing the system with the specified number of workers. diff --git a/pkg/controller/namespace/namespace_controller_test.go b/pkg/controller/namespace/namespace_controller_test.go index 3e523c06cc7..5061f25f3f1 100644 --- a/pkg/controller/namespace/namespace_controller_test.go +++ b/pkg/controller/namespace/namespace_controller_test.go @@ -18,7 +18,11 @@ package namespace import ( "fmt" + "net/http" + "net/http/httptest" + "path" "strings" + "sync" "testing" "k8s.io/kubernetes/pkg/api" @@ -26,7 +30,9 @@ import ( "k8s.io/kubernetes/pkg/api/unversioned" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" + "k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/testing/core" + "k8s.io/kubernetes/pkg/client/typed/dynamic" "k8s.io/kubernetes/pkg/util/sets" ) @@ -56,7 +62,7 @@ func TestFinalizeNamespaceFunc(t *testing.T) { Finalizers: []api.FinalizerName{"kubernetes", "other"}, }, } - finalizeNamespaceFunc(mockClient, testNamespace) + finalizeNamespace(mockClient, testNamespace, api.FinalizerKubernetes) actions := mockClient.Actions() if len(actions) != 1 { t.Errorf("Expected 1 mock client action, but got %v", len(actions)) @@ -75,9 +81,10 @@ func TestFinalizeNamespaceFunc(t *testing.T) { func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIVersions) { now := unversioned.Now() + namespaceName := "test" testNamespacePendingFinalize := &api.Namespace{ ObjectMeta: api.ObjectMeta{ - Name: "test", + Name: namespaceName, ResourceVersion: "1", DeletionTimestamp: &now, }, @@ -90,7 +97,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV } testNamespaceFinalizeComplete := &api.Namespace{ ObjectMeta: api.ObjectMeta{ - Name: "test", + Name: namespaceName, ResourceVersion: "1", DeletionTimestamp: &now, }, @@ -100,78 +107,77 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV }, } - // TODO: Reuse the constants for all these strings from testclient - pendingActionSet := sets.NewString( - strings.Join([]string{"get", "namespaces", ""}, "-"), - strings.Join([]string{"delete-collection", "replicationcontrollers", ""}, "-"), - strings.Join([]string{"list", "services", ""}, "-"), - strings.Join([]string{"list", "pods", ""}, "-"), - strings.Join([]string{"delete-collection", "resourcequotas", ""}, "-"), - strings.Join([]string{"delete-collection", "secrets", ""}, "-"), - strings.Join([]string{"delete-collection", "configmaps", ""}, "-"), - strings.Join([]string{"delete-collection", "limitranges", ""}, "-"), - strings.Join([]string{"delete-collection", "events", ""}, "-"), - strings.Join([]string{"delete-collection", "serviceaccounts", ""}, "-"), - strings.Join([]string{"delete-collection", "persistentvolumeclaims", ""}, "-"), - strings.Join([]string{"create", "namespaces", "finalize"}, "-"), - ) - - if containsVersion(versions, "extensions/v1beta1") { - pendingActionSet.Insert( - strings.Join([]string{"delete-collection", "daemonsets", ""}, "-"), - strings.Join([]string{"delete-collection", "deployments", ""}, "-"), - strings.Join([]string{"delete-collection", "replicasets", ""}, "-"), - strings.Join([]string{"delete-collection", "jobs", ""}, "-"), - strings.Join([]string{"delete-collection", "horizontalpodautoscalers", ""}, "-"), - strings.Join([]string{"delete-collection", "ingresses", ""}, "-"), - strings.Join([]string{"get", "resource", ""}, "-"), - ) + // when doing a delete all of content, we will do a GET of a collection, and DELETE of a collection by default + dynamicClientActionSet := sets.NewString() + groupVersionResources := testGroupVersionResources() + for _, groupVersionResource := range groupVersionResources { + urlPath := path.Join([]string{ + dynamic.LegacyAPIPathResolverFunc(groupVersionResource.GroupVersion()), + groupVersionResource.Group, + groupVersionResource.Version, + "namespaces", + namespaceName, + groupVersionResource.Resource, + }...) + dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String()) + dynamicClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String()) } scenarios := map[string]struct { - testNamespace *api.Namespace - expectedActionSet sets.String + testNamespace *api.Namespace + kubeClientActionSet sets.String + dynamicClientActionSet sets.String }{ "pending-finalize": { - testNamespace: testNamespacePendingFinalize, - expectedActionSet: pendingActionSet, + testNamespace: testNamespacePendingFinalize, + kubeClientActionSet: sets.NewString( + strings.Join([]string{"get", "namespaces", ""}, "-"), + strings.Join([]string{"list", "pods", ""}, "-"), + strings.Join([]string{"create", "namespaces", "finalize"}, "-"), + ), + dynamicClientActionSet: dynamicClientActionSet, }, "complete-finalize": { testNamespace: testNamespaceFinalizeComplete, - expectedActionSet: sets.NewString( + kubeClientActionSet: sets.NewString( strings.Join([]string{"get", "namespaces", ""}, "-"), strings.Join([]string{"delete", "namespaces", ""}, "-"), ), + dynamicClientActionSet: sets.NewString(), }, } for scenario, testInput := range scenarios { + testHandler := &fakeActionHandler{statusCode: 200} + srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP) + defer srv.Close() + mockClient := fake.NewSimpleClientset(testInput.testNamespace) - if containsVersion(versions, "extensions/v1beta1") { - resources := []unversioned.APIResource{} - for _, resource := range []string{"daemonsets", "deployments", "replicasets", "jobs", "horizontalpodautoscalers", "ingresses"} { - resources = append(resources, unversioned.APIResource{Name: resource}) - } - mockClient.Resources = map[string]*unversioned.APIResourceList{ - "extensions/v1beta1": { - GroupVersion: "extensions/v1beta1", - APIResources: resources, - }, - } - } - err := syncNamespace(mockClient, versions, testInput.testNamespace) + clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc) + + err := syncNamespace(mockClient, clientPool, operationNotSupportedCache{}, groupVersionResources, testInput.testNamespace, api.FinalizerKubernetes) if err != nil { t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err) } + + // validate traffic from kube client actionSet := sets.NewString() for _, action := range mockClient.Actions() { actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-")) } - if !actionSet.HasAll(testInput.expectedActionSet.List()...) { - t.Errorf("scenario %s - Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, testInput.expectedActionSet, actionSet, testInput.expectedActionSet.Difference(actionSet)) + if !actionSet.Equal(testInput.kubeClientActionSet) { + t.Errorf("scenario %s - mock client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, + testInput.kubeClientActionSet, actionSet, testInput.kubeClientActionSet.Difference(actionSet)) } - if !testInput.expectedActionSet.HasAll(actionSet.List()...) { - t.Errorf("scenario %s - Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, testInput.expectedActionSet, actionSet, actionSet.Difference(testInput.expectedActionSet)) + + // validate traffic from dynamic client + actionSet = sets.NewString() + for _, action := range testHandler.actions { + actionSet.Insert(action.String()) + } + if !actionSet.Equal(testInput.dynamicClientActionSet) { + t.Errorf("scenario %s - dynamic client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, + testInput.dynamicClientActionSet, actionSet, testInput.dynamicClientActionSet.Difference(actionSet)) } } } @@ -218,7 +224,7 @@ func TestSyncNamespaceThatIsActive(t *testing.T) { Phase: api.NamespaceActive, }, } - err := syncNamespace(mockClient, &unversioned.APIVersions{}, testNamespace) + err := syncNamespace(mockClient, nil, operationNotSupportedCache{}, testGroupVersionResources(), testNamespace, api.FinalizerKubernetes) if err != nil { t.Errorf("Unexpected error when synching namespace %v", err) } @@ -226,3 +232,51 @@ func TestSyncNamespaceThatIsActive(t *testing.T) { t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions()) } } + +// testServerAndClientConfig returns a server that listens and a config that can reference it +func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *restclient.Config) { + srv := httptest.NewServer(http.HandlerFunc(handler)) + config := &restclient.Config{ + Host: srv.URL, + } + return srv, config +} + +// fakeAction records information about requests to aid in testing. +type fakeAction struct { + method string + path string +} + +// String returns method=path to aid in testing +func (f *fakeAction) String() string { + return strings.Join([]string{f.method, f.path}, "=") +} + +// fakeActionHandler holds a list of fakeActions received +type fakeActionHandler struct { + // statusCode returned by this handler + statusCode int + + lock sync.Mutex + actions []fakeAction +} + +// ServeHTTP logs the action that occurred and always returns the associated status code +func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) { + f.lock.Lock() + defer f.lock.Unlock() + + f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path}) + response.WriteHeader(f.statusCode) + response.Write([]byte("{\"kind\": \"List\"}")) +} + +// testGroupVersionResources returns a mocked up set of resources across different api groups for testing namespace controller. +func testGroupVersionResources() []unversioned.GroupVersionResource { + results := []unversioned.GroupVersionResource{} + results = append(results, unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"}) + results = append(results, unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "services"}) + results = append(results, unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"}) + return results +} diff --git a/pkg/controller/namespace/namespace_controller_utils.go b/pkg/controller/namespace/namespace_controller_utils.go index b3041e2d9f1..35ce7384a8c 100644 --- a/pkg/controller/namespace/namespace_controller_utils.go +++ b/pkg/controller/namespace/namespace_controller_utils.go @@ -18,12 +18,17 @@ package namespace import ( "fmt" + "strings" + "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/v1" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + "k8s.io/kubernetes/pkg/client/typed/dynamic" + client "k8s.io/kubernetes/pkg/client/unversioned" + "k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/util/sets" "github.com/golang/glog" @@ -38,6 +43,29 @@ func (e *contentRemainingError) Error() string { return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate) } +// operation is used for caching if an operation is supported on a dynamic client. +type operation string + +const ( + operationDeleteCollection operation = "deleteCollection" + operationList operation = "list" +) + +// operationKey is an entry in a cache. +type operationKey struct { + op operation + gvr unversioned.GroupVersionResource +} + +// operationNotSupportedCache is a simple cache to remember if an operation is not supported for a resource. +// if the operationKey maps to true, it means the operation is not supported. +type operationNotSupportedCache map[operationKey]bool + +// isSupported returns true if the operation is supported +func (o operationNotSupportedCache) isSupported(key operationKey) bool { + return !o[key] +} + // updateNamespaceFunc is a function that makes an update to a namespace type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) @@ -58,7 +86,6 @@ func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespa return nil, err } } - return } // updateNamespaceStatusFunc will verify that the status of the namespace is correct @@ -78,14 +105,21 @@ func finalized(namespace *api.Namespace) bool { return len(namespace.Spec.Finalizers) == 0 } -// finalizeNamespaceFunc removes the kubernetes token and finalizes the namespace -func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { +// finalizeNamespaceFunc returns a function that knows how to finalize a namespace for specified token. +func finalizeNamespaceFunc(finalizerToken api.FinalizerName) updateNamespaceFunc { + return func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { + return finalizeNamespace(kubeClient, namespace, finalizerToken) + } +} + +// finalizeNamespace removes the specified finalizerToken and finalizes the namespace +func finalizeNamespace(kubeClient clientset.Interface, namespace *api.Namespace, finalizerToken api.FinalizerName) (*api.Namespace, error) { namespaceFinalize := api.Namespace{} namespaceFinalize.ObjectMeta = namespace.ObjectMeta namespaceFinalize.Spec = namespace.Spec finalizerSet := sets.NewString() for i := range namespace.Spec.Finalizers { - if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes { + if namespace.Spec.Finalizers[i] != finalizerToken { finalizerSet.Insert(string(namespace.Spec.Finalizers[i])) } } @@ -103,99 +137,207 @@ func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namesp return namespace, err } -// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate -// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources -// are guaranteed to be gone. -// TODO: this should use discovery to delete arbitrary namespace content -func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) { - err = deleteServiceAccounts(kubeClient, namespace) +// deleteCollection is a helper function that will delete the collection of resources +// it returns true if the operation was supported on the server. +// it returns an error if the operation was supported on the server but was unable to complete. +func deleteCollection( + dynamicClient *dynamic.Client, + opCache operationNotSupportedCache, + gvr unversioned.GroupVersionResource, + namespace string, +) (bool, error) { + glog.V(4).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr) + + key := operationKey{op: operationDeleteCollection, gvr: gvr} + if !opCache.isSupported(key) { + glog.V(4).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr) + return false, nil + } + + apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true} + err := dynamicClient.Resource(&apiResource, namespace).DeleteCollection(nil, v1.ListOptions{}) + + if err == nil { + return true, nil + } + + // this is strange, but we need to special case for both MethodNotSupported and NotFound errors + // TODO: https://github.com/kubernetes/kubernetes/issues/22413 + // we have a resource returned in the discovery API that supports no top-level verbs: + // /apis/extensions/v1beta1/namespaces/default/replicationcontrollers + // when working with this resource type, we will get a literal not found error rather than expected method not supported + // remember next time that this resource does not support delete collection... + if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) { + glog.V(4).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr) + opCache[key] = true + return false, nil + } + + glog.V(4).Infof("namespace controller - deleteCollection unexpected error - namespace: %s, gvr: %v, error: %v", namespace, gvr, err) + return true, err +} + +// listCollection will list the items in the specified namespace +// it returns the following: +// the list of items in the collection (if found) +// a boolean if the operation is supported +// an error if the operation is supported but could not be completed. +func listCollection( + dynamicClient *dynamic.Client, + opCache operationNotSupportedCache, + gvr unversioned.GroupVersionResource, + namespace string, +) (*runtime.UnstructuredList, bool, error) { + glog.V(4).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr) + + key := operationKey{op: operationList, gvr: gvr} + if !opCache.isSupported(key) { + glog.V(4).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr) + return nil, false, nil + } + + apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true} + unstructuredList, err := dynamicClient.Resource(&apiResource, namespace).List(v1.ListOptions{}) + if err == nil { + return unstructuredList, true, nil + } + + // this is strange, but we need to special case for both MethodNotSupported and NotFound errors + // TODO: https://github.com/kubernetes/kubernetes/issues/22413 + // we have a resource returned in the discovery API that supports no top-level verbs: + // /apis/extensions/v1beta1/namespaces/default/replicationcontrollers + // when working with this resource type, we will get a literal not found error rather than expected method not supported + // remember next time that this resource does not support delete collection... + if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) { + glog.V(4).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr) + opCache[key] = true + return nil, false, nil + } + + return nil, true, err +} + +// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1. +func deleteEachItem( + dynamicClient *dynamic.Client, + opCache operationNotSupportedCache, + gvr unversioned.GroupVersionResource, + namespace string, +) error { + glog.V(4).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr) + + unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace) + if err != nil { + return err + } + if !listSupported { + return nil + } + apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true} + for _, item := range unstructuredList.Items { + if err = dynamicClient.Resource(&apiResource, namespace).Delete(item.Name, nil); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) { + return err + } + } + return nil +} + +// deleteAllContentForGroupVersionResource will use the dynamic client to delete each resource identified in gvr. +// It returns an estimate of the time remaining before the remaing resources are deleted. +// If estimate > 0, not all resources are guaranteed to be gone. +func deleteAllContentForGroupVersionResource( + kubeClient clientset.Interface, + clientPool dynamic.ClientPool, + opCache operationNotSupportedCache, + gvr unversioned.GroupVersionResource, + namespace string, + namespaceDeletedAt unversioned.Time, +) (int64, error) { + glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr) + + // estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete) + estimate, err := estimateGracefulTermination(kubeClient, gvr, namespace, namespaceDeletedAt) + if err != nil { + glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err) + return estimate, err + } + glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate) + + // get a client for this group version... + dynamicClient, err := clientPool.ClientForGroupVersion(gvr.GroupVersion()) + if err != nil { + glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err) + return estimate, err + } + + // first try to delete the entire collection + deleteCollectionSupported, err := deleteCollection(dynamicClient, opCache, gvr, namespace) if err != nil { return estimate, err } - err = deleteServices(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteReplicationControllers(kubeClient, namespace) - if err != nil { - return estimate, err - } - estimate, err = deletePods(kubeClient, namespace, before) - if err != nil { - return estimate, err - } - err = deleteSecrets(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteConfigMaps(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deletePersistentVolumeClaims(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteLimitRanges(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteResourceQuotas(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteEvents(kubeClient, namespace) - if err != nil { - return estimate, err - } - // If experimental mode, delete all experimental resources for the namespace. - if containsVersion(versions, "extensions/v1beta1") { - resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1") + + // delete collection was not supported, so we list and delete each item... + if !deleteCollectionSupported { + err = deleteEachItem(dynamicClient, opCache, gvr, namespace) if err != nil { return estimate, err } - if containsResource(resources, "horizontalpodautoscalers") { - err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "ingresses") { - err = deleteIngress(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "daemonsets") { - err = deleteDaemonSets(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "jobs") { - err = deleteJobs(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "deployments") { - err = deleteDeployments(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "replicasets") { - err = deleteReplicaSets(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } + } + + // verify there are no more remaining items + // it is not an error condition for there to be remaining items if local estimate is non-zero + glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr) + unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace) + if err != nil { + glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err) + return estimate, err + } + if !listSupported { + return estimate, nil + } + glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining - namespace: %s, gvr: %v, items: %v", namespace, gvr, len(unstructuredList.Items)) + if len(unstructuredList.Items) != 0 && estimate == int64(0) { + return estimate, fmt.Errorf("unexpected items still remain in namespace: %s for gvr: %v", namespace, gvr) } return estimate, nil } +// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources. +// It returns an estimate of the time remaining before the remaing resources are deleted. +// If estimate > 0, not all resources are guaranteed to be gone. +func deleteAllContent( + kubeClient clientset.Interface, + clientPool dynamic.ClientPool, + opCache operationNotSupportedCache, + groupVersionResources []unversioned.GroupVersionResource, + namespace string, + namespaceDeletedAt unversioned.Time, +) (int64, error) { + estimate := int64(0) + glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, gvrs: %v", namespace, groupVersionResources) + // iterate over each group version, and attempt to delete all of its resources + for _, gvr := range groupVersionResources { + gvrEstimate, err := deleteAllContentForGroupVersionResource(kubeClient, clientPool, opCache, gvr, namespace, namespaceDeletedAt) + if err != nil { + return estimate, err + } + if gvrEstimate > estimate { + estimate = gvrEstimate + } + } + glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v", namespace, estimate) + return estimate, nil +} + // syncNamespace orchestrates deletion of a Namespace and its associated content. -func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error { +func syncNamespace( + kubeClient clientset.Interface, + clientPool dynamic.ClientPool, + opCache operationNotSupportedCache, + groupVersionResources []unversioned.GroupVersionResource, + namespace *api.Namespace, + finalizerToken api.FinalizerName, +) error { if namespace.DeletionTimestamp == nil { return nil } @@ -211,7 +353,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers return err } - glog.V(4).Infof("Syncing namespace %s", namespace.Name) + glog.V(4).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, finalizerToken) // ensure that the status is up to date on the namespace // if we get a not found error, we assume the namespace is truly gone @@ -233,7 +375,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers } // there may still be content for us to remove - estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp) + estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp) if err != nil { return err } @@ -242,7 +384,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers } // we have removed content, so mark it finalized by us - result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc) + result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc(finalizerToken)) if err != nil { // in normal practice, this should not be possible, but if a deployment is running // two controllers to do namespace deletion that share a common finalizer token it's @@ -264,125 +406,75 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers return nil } -func deleteLimitRanges(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().LimitRanges(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().ResourceQuotas(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().ServiceAccounts(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteServices(kubeClient clientset.Interface, ns string) error { - items, err := kubeClient.Core().Services(ns).List(api.ListOptions{}) +// estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace +func estimateGracefulTermination(kubeClient clientset.Interface, groupVersionResource unversioned.GroupVersionResource, ns string, namespaceDeletedAt unversioned.Time) (int64, error) { + groupResource := groupVersionResource.GroupResource() + glog.V(4).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource) + estimate := int64(0) + var err error + switch groupResource { + case unversioned.GroupResource{Group: "", Resource: "pods"}: + estimate, err = estimateGracefulTerminationForPods(kubeClient, ns) + } if err != nil { - return err + return estimate, err } - for i := range items.Items { - err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } + // determine if the estimate is greater than the deletion timestamp + duration := time.Since(namespaceDeletedAt.Time) + allowedEstimate := time.Duration(estimate) * time.Second + if duration >= allowedEstimate { + estimate = int64(0) } - return nil + return estimate, nil } -func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().ReplicationControllers(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) { +// estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace +func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns string) (int64, error) { + glog.V(4).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns) + estimate := int64(0) items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{}) if err != nil { - return 0, err + return estimate, err } - expired := unversioned.Now().After(before.Time) - var deleteOptions *api.DeleteOptions - if expired { - deleteOptions = api.NewDeleteOptions(0) - } - estimate := int64(0) for i := range items.Items { + // filter out terminal pods + phase := items.Items[i].Status.Phase + if api.PodSucceeded == phase || api.PodFailed == phase { + continue + } if items.Items[i].Spec.TerminationGracePeriodSeconds != nil { grace := *items.Items[i].Spec.TerminationGracePeriodSeconds if grace > estimate { estimate = grace } } - err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions) - if err != nil && !errors.IsNotFound(err) { - return 0, err - } - } - if expired { - estimate = 0 } return estimate, nil } -func deleteEvents(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteSecrets(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().Secrets(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteConfigMaps(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().ConfigMaps(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().PersistentVolumeClaims(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteHorizontalPodAutoscalers(expClient unversionedextensions.ExtensionsInterface, ns string) error { - return expClient.HorizontalPodAutoscalers(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteDaemonSets(expClient unversionedextensions.ExtensionsInterface, ns string) error { - return expClient.DaemonSets(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteJobs(expClient unversionedextensions.ExtensionsInterface, ns string) error { - return expClient.Jobs(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteDeployments(expClient unversionedextensions.ExtensionsInterface, ns string) error { - return expClient.Deployments(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteReplicaSets(expClient unversionedextensions.ExtensionsInterface, ns string) error { - return expClient.ReplicaSets(ns).DeleteCollection(nil, api.ListOptions{}) -} - -func deleteIngress(expClient unversionedextensions.ExtensionsInterface, ns string) error { - return expClient.Ingresses(ns).DeleteCollection(nil, api.ListOptions{}) -} - -// TODO: this is duplicated logic. Move it somewhere central? -func containsVersion(versions *unversioned.APIVersions, version string) bool { - for ix := range versions.Versions { - if versions.Versions[ix] == version { - return true +// ServerPreferredNamespacedGroupVersionResources uses the specified client to discover the set of preferred groupVersionResources that are namespaced +func ServerPreferredNamespacedGroupVersionResources(discoveryClient client.DiscoveryInterface) ([]unversioned.GroupVersionResource, error) { + results := []unversioned.GroupVersionResource{} + serverGroupList, err := discoveryClient.ServerGroups() + if err != nil { + return results, err + } + for _, apiGroup := range serverGroupList.Groups { + preferredVersion := apiGroup.PreferredVersion + apiResourceList, err := discoveryClient.ServerResourcesForGroupVersion(preferredVersion.GroupVersion) + if err != nil { + return results, err + } + groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version} + for _, apiResource := range apiResourceList.APIResources { + if !apiResource.Namespaced { + continue + } + if strings.Contains(apiResource.Name, "/") { + continue + } + results = append(results, groupVersion.WithResource(apiResource.Name)) } } - return false -} - -// TODO: this is duplicated logic. Move it somewhere central? -func containsResource(resources *unversioned.APIResourceList, resourceName string) bool { - if resources == nil { - return false - } - for ix := range resources.APIResources { - resource := resources.APIResources[ix] - if resource.Name == resourceName { - return true - } - } - return false + return results, nil }