Move namespace controller to use discovery and dynamic client

This commit is contained in:
derekwaynecarr 2016-03-02 23:34:18 -05:00
parent 3bda581957
commit 41057b02d5
5 changed files with 426 additions and 249 deletions

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
@ -266,7 +267,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Fatalf("Failed to get supported resources from server: %v", err) glog.Fatalf("Failed to get supported resources from server: %v", err)
} }
namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod.Duration) // Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery())
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop)
groupVersion := "extensions/v1beta1" groupVersion := "extensions/v1beta1"

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
@ -214,7 +215,14 @@ func (s *CMServer) Run(_ []string) error {
glog.Fatalf("Failed to get supported resources from server: %v", err) glog.Fatalf("Failed to get supported resources from server: %v", err)
} }
namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod.Duration) // Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery())
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop)
groupVersion := "extensions/v1beta1" groupVersion := "extensions/v1beta1"

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -38,23 +39,37 @@ import (
type NamespaceController struct { type NamespaceController struct {
// client that purges namespace content, must have list/delete privileges on all content // client that purges namespace content, must have list/delete privileges on all content
kubeClient clientset.Interface kubeClient clientset.Interface
// clientPool manages a pool of dynamic clients
clientPool dynamic.ClientPool
// store that holds the namespaces // store that holds the namespaces
store cache.Store store cache.Store
// controller that observes the namespaces // controller that observes the namespaces
controller *framework.Controller controller *framework.Controller
// namespaces that have been queued up for processing by workers // namespaces that have been queued up for processing by workers
queue *workqueue.Type queue *workqueue.Type
// list of versions to process // list of preferred group versions and their corresponding resource set for namespace deletion
versions *unversioned.APIVersions groupVersionResources []unversioned.GroupVersionResource
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
finalizerToken api.FinalizerName
} }
// NewNamespaceController creates a new NamespaceController // NewNamespaceController creates a new NamespaceController
func NewNamespaceController(kubeClient clientset.Interface, versions *unversioned.APIVersions, resyncPeriod time.Duration) *NamespaceController { func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResources []unversioned.GroupVersionResource,
resyncPeriod time.Duration,
finalizerToken api.FinalizerName) *NamespaceController {
// create the controller so we can inject the enqueue function // create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{ namespaceController := &NamespaceController{
kubeClient: kubeClient, kubeClient: kubeClient,
versions: versions, clientPool: clientPool,
queue: workqueue.New(), queue: workqueue.New(),
groupVersionResources: groupVersionResources,
opCache: operationNotSupportedCache{},
finalizerToken: finalizerToken,
} }
// configure the backing store/controller // configure the backing store/controller
@ -144,7 +159,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
return err return err
} }
namespace := obj.(*api.Namespace) namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.versions, namespace) return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken)
} }
// Run starts observing the system with the specified number of workers. // Run starts observing the system with the specified number of workers.

View File

@ -18,7 +18,11 @@ package namespace
import ( import (
"fmt" "fmt"
"net/http"
"net/http/httptest"
"path"
"strings" "strings"
"sync"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -26,7 +30,9 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -56,7 +62,7 @@ func TestFinalizeNamespaceFunc(t *testing.T) {
Finalizers: []api.FinalizerName{"kubernetes", "other"}, Finalizers: []api.FinalizerName{"kubernetes", "other"},
}, },
} }
finalizeNamespaceFunc(mockClient, testNamespace) finalizeNamespace(mockClient, testNamespace, api.FinalizerKubernetes)
actions := mockClient.Actions() actions := mockClient.Actions()
if len(actions) != 1 { if len(actions) != 1 {
t.Errorf("Expected 1 mock client action, but got %v", len(actions)) t.Errorf("Expected 1 mock client action, but got %v", len(actions))
@ -75,9 +81,10 @@ func TestFinalizeNamespaceFunc(t *testing.T) {
func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIVersions) { func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIVersions) {
now := unversioned.Now() now := unversioned.Now()
namespaceName := "test"
testNamespacePendingFinalize := &api.Namespace{ testNamespacePendingFinalize := &api.Namespace{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "test", Name: namespaceName,
ResourceVersion: "1", ResourceVersion: "1",
DeletionTimestamp: &now, DeletionTimestamp: &now,
}, },
@ -90,7 +97,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
} }
testNamespaceFinalizeComplete := &api.Namespace{ testNamespaceFinalizeComplete := &api.Namespace{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "test", Name: namespaceName,
ResourceVersion: "1", ResourceVersion: "1",
DeletionTimestamp: &now, DeletionTimestamp: &now,
}, },
@ -100,78 +107,77 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
}, },
} }
// TODO: Reuse the constants for all these strings from testclient // when doing a delete all of content, we will do a GET of a collection, and DELETE of a collection by default
pendingActionSet := sets.NewString( dynamicClientActionSet := sets.NewString()
strings.Join([]string{"get", "namespaces", ""}, "-"), groupVersionResources := testGroupVersionResources()
strings.Join([]string{"delete-collection", "replicationcontrollers", ""}, "-"), for _, groupVersionResource := range groupVersionResources {
strings.Join([]string{"list", "services", ""}, "-"), urlPath := path.Join([]string{
strings.Join([]string{"list", "pods", ""}, "-"), dynamic.LegacyAPIPathResolverFunc(groupVersionResource.GroupVersion()),
strings.Join([]string{"delete-collection", "resourcequotas", ""}, "-"), groupVersionResource.Group,
strings.Join([]string{"delete-collection", "secrets", ""}, "-"), groupVersionResource.Version,
strings.Join([]string{"delete-collection", "configmaps", ""}, "-"), "namespaces",
strings.Join([]string{"delete-collection", "limitranges", ""}, "-"), namespaceName,
strings.Join([]string{"delete-collection", "events", ""}, "-"), groupVersionResource.Resource,
strings.Join([]string{"delete-collection", "serviceaccounts", ""}, "-"), }...)
strings.Join([]string{"delete-collection", "persistentvolumeclaims", ""}, "-"), dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
strings.Join([]string{"create", "namespaces", "finalize"}, "-"), dynamicClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String())
)
if containsVersion(versions, "extensions/v1beta1") {
pendingActionSet.Insert(
strings.Join([]string{"delete-collection", "daemonsets", ""}, "-"),
strings.Join([]string{"delete-collection", "deployments", ""}, "-"),
strings.Join([]string{"delete-collection", "replicasets", ""}, "-"),
strings.Join([]string{"delete-collection", "jobs", ""}, "-"),
strings.Join([]string{"delete-collection", "horizontalpodautoscalers", ""}, "-"),
strings.Join([]string{"delete-collection", "ingresses", ""}, "-"),
strings.Join([]string{"get", "resource", ""}, "-"),
)
} }
scenarios := map[string]struct { scenarios := map[string]struct {
testNamespace *api.Namespace testNamespace *api.Namespace
expectedActionSet sets.String kubeClientActionSet sets.String
dynamicClientActionSet sets.String
}{ }{
"pending-finalize": { "pending-finalize": {
testNamespace: testNamespacePendingFinalize, testNamespace: testNamespacePendingFinalize,
expectedActionSet: pendingActionSet, kubeClientActionSet: sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"create", "namespaces", "finalize"}, "-"),
),
dynamicClientActionSet: dynamicClientActionSet,
}, },
"complete-finalize": { "complete-finalize": {
testNamespace: testNamespaceFinalizeComplete, testNamespace: testNamespaceFinalizeComplete,
expectedActionSet: sets.NewString( kubeClientActionSet: sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"), strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"), strings.Join([]string{"delete", "namespaces", ""}, "-"),
), ),
dynamicClientActionSet: sets.NewString(),
}, },
} }
for scenario, testInput := range scenarios { for scenario, testInput := range scenarios {
testHandler := &fakeActionHandler{statusCode: 200}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
mockClient := fake.NewSimpleClientset(testInput.testNamespace) mockClient := fake.NewSimpleClientset(testInput.testNamespace)
if containsVersion(versions, "extensions/v1beta1") { clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
resources := []unversioned.APIResource{}
for _, resource := range []string{"daemonsets", "deployments", "replicasets", "jobs", "horizontalpodautoscalers", "ingresses"} { err := syncNamespace(mockClient, clientPool, operationNotSupportedCache{}, groupVersionResources, testInput.testNamespace, api.FinalizerKubernetes)
resources = append(resources, unversioned.APIResource{Name: resource})
}
mockClient.Resources = map[string]*unversioned.APIResourceList{
"extensions/v1beta1": {
GroupVersion: "extensions/v1beta1",
APIResources: resources,
},
}
}
err := syncNamespace(mockClient, versions, testInput.testNamespace)
if err != nil { if err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err) t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
} }
// validate traffic from kube client
actionSet := sets.NewString() actionSet := sets.NewString()
for _, action := range mockClient.Actions() { for _, action := range mockClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-")) actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
} }
if !actionSet.HasAll(testInput.expectedActionSet.List()...) { if !actionSet.Equal(testInput.kubeClientActionSet) {
t.Errorf("scenario %s - Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, testInput.expectedActionSet, actionSet, testInput.expectedActionSet.Difference(actionSet)) t.Errorf("scenario %s - mock client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.kubeClientActionSet, actionSet, testInput.kubeClientActionSet.Difference(actionSet))
} }
if !testInput.expectedActionSet.HasAll(actionSet.List()...) {
t.Errorf("scenario %s - Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, testInput.expectedActionSet, actionSet, actionSet.Difference(testInput.expectedActionSet)) // validate traffic from dynamic client
actionSet = sets.NewString()
for _, action := range testHandler.actions {
actionSet.Insert(action.String())
}
if !actionSet.Equal(testInput.dynamicClientActionSet) {
t.Errorf("scenario %s - dynamic client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.dynamicClientActionSet, actionSet, testInput.dynamicClientActionSet.Difference(actionSet))
} }
} }
} }
@ -218,7 +224,7 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
Phase: api.NamespaceActive, Phase: api.NamespaceActive,
}, },
} }
err := syncNamespace(mockClient, &unversioned.APIVersions{}, testNamespace) err := syncNamespace(mockClient, nil, operationNotSupportedCache{}, testGroupVersionResources(), testNamespace, api.FinalizerKubernetes)
if err != nil { if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err) t.Errorf("Unexpected error when synching namespace %v", err)
} }
@ -226,3 +232,51 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions()) t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions())
} }
} }
// testServerAndClientConfig returns a server that listens and a config that can reference it
func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *restclient.Config) {
srv := httptest.NewServer(http.HandlerFunc(handler))
config := &restclient.Config{
Host: srv.URL,
}
return srv, config
}
// fakeAction records information about requests to aid in testing.
type fakeAction struct {
method string
path string
}
// String returns method=path to aid in testing
func (f *fakeAction) String() string {
return strings.Join([]string{f.method, f.path}, "=")
}
// fakeActionHandler holds a list of fakeActions received
type fakeActionHandler struct {
// statusCode returned by this handler
statusCode int
lock sync.Mutex
actions []fakeAction
}
// ServeHTTP logs the action that occurred and always returns the associated status code
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
f.lock.Lock()
defer f.lock.Unlock()
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path})
response.WriteHeader(f.statusCode)
response.Write([]byte("{\"kind\": \"List\"}"))
}
// testGroupVersionResources returns a mocked up set of resources across different api groups for testing namespace controller.
func testGroupVersionResources() []unversioned.GroupVersionResource {
results := []unversioned.GroupVersionResource{}
results = append(results, unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"})
results = append(results, unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "services"})
results = append(results, unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"})
return results
}

View File

@ -18,12 +18,17 @@ package namespace
import ( import (
"fmt" "fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/client/typed/dynamic"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog" "github.com/golang/glog"
@ -38,6 +43,29 @@ func (e *contentRemainingError) Error() string {
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate) return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
} }
// operation is used for caching if an operation is supported on a dynamic client.
type operation string
const (
operationDeleteCollection operation = "deleteCollection"
operationList operation = "list"
)
// operationKey is an entry in a cache.
type operationKey struct {
op operation
gvr unversioned.GroupVersionResource
}
// operationNotSupportedCache is a simple cache to remember if an operation is not supported for a resource.
// if the operationKey maps to true, it means the operation is not supported.
type operationNotSupportedCache map[operationKey]bool
// isSupported returns true if the operation is supported
func (o operationNotSupportedCache) isSupported(key operationKey) bool {
return !o[key]
}
// updateNamespaceFunc is a function that makes an update to a namespace // updateNamespaceFunc is a function that makes an update to a namespace
type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error)
@ -58,7 +86,6 @@ func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespa
return nil, err return nil, err
} }
} }
return
} }
// updateNamespaceStatusFunc will verify that the status of the namespace is correct // updateNamespaceStatusFunc will verify that the status of the namespace is correct
@ -78,14 +105,21 @@ func finalized(namespace *api.Namespace) bool {
return len(namespace.Spec.Finalizers) == 0 return len(namespace.Spec.Finalizers) == 0
} }
// finalizeNamespaceFunc removes the kubernetes token and finalizes the namespace // finalizeNamespaceFunc returns a function that knows how to finalize a namespace for specified token.
func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { func finalizeNamespaceFunc(finalizerToken api.FinalizerName) updateNamespaceFunc {
return func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
return finalizeNamespace(kubeClient, namespace, finalizerToken)
}
}
// finalizeNamespace removes the specified finalizerToken and finalizes the namespace
func finalizeNamespace(kubeClient clientset.Interface, namespace *api.Namespace, finalizerToken api.FinalizerName) (*api.Namespace, error) {
namespaceFinalize := api.Namespace{} namespaceFinalize := api.Namespace{}
namespaceFinalize.ObjectMeta = namespace.ObjectMeta namespaceFinalize.ObjectMeta = namespace.ObjectMeta
namespaceFinalize.Spec = namespace.Spec namespaceFinalize.Spec = namespace.Spec
finalizerSet := sets.NewString() finalizerSet := sets.NewString()
for i := range namespace.Spec.Finalizers { for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes { if namespace.Spec.Finalizers[i] != finalizerToken {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i])) finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
} }
} }
@ -103,99 +137,207 @@ func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namesp
return namespace, err return namespace, err
} }
// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate // deleteCollection is a helper function that will delete the collection of resources
// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources // it returns true if the operation was supported on the server.
// are guaranteed to be gone. // it returns an error if the operation was supported on the server but was unable to complete.
// TODO: this should use discovery to delete arbitrary namespace content func deleteCollection(
func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) { dynamicClient *dynamic.Client,
err = deleteServiceAccounts(kubeClient, namespace) opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
) (bool, error) {
glog.V(4).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
key := operationKey{op: operationDeleteCollection, gvr: gvr}
if !opCache.isSupported(key) {
glog.V(4).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
return false, nil
}
apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
err := dynamicClient.Resource(&apiResource, namespace).DeleteCollection(nil, v1.ListOptions{})
if err == nil {
return true, nil
}
// this is strange, but we need to special case for both MethodNotSupported and NotFound errors
// TODO: https://github.com/kubernetes/kubernetes/issues/22413
// we have a resource returned in the discovery API that supports no top-level verbs:
// /apis/extensions/v1beta1/namespaces/default/replicationcontrollers
// when working with this resource type, we will get a literal not found error rather than expected method not supported
// remember next time that this resource does not support delete collection...
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
glog.V(4).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
opCache[key] = true
return false, nil
}
glog.V(4).Infof("namespace controller - deleteCollection unexpected error - namespace: %s, gvr: %v, error: %v", namespace, gvr, err)
return true, err
}
// listCollection will list the items in the specified namespace
// it returns the following:
// the list of items in the collection (if found)
// a boolean if the operation is supported
// an error if the operation is supported but could not be completed.
func listCollection(
dynamicClient *dynamic.Client,
opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
) (*runtime.UnstructuredList, bool, error) {
glog.V(4).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
key := operationKey{op: operationList, gvr: gvr}
if !opCache.isSupported(key) {
glog.V(4).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
return nil, false, nil
}
apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
unstructuredList, err := dynamicClient.Resource(&apiResource, namespace).List(v1.ListOptions{})
if err == nil {
return unstructuredList, true, nil
}
// this is strange, but we need to special case for both MethodNotSupported and NotFound errors
// TODO: https://github.com/kubernetes/kubernetes/issues/22413
// we have a resource returned in the discovery API that supports no top-level verbs:
// /apis/extensions/v1beta1/namespaces/default/replicationcontrollers
// when working with this resource type, we will get a literal not found error rather than expected method not supported
// remember next time that this resource does not support delete collection...
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
glog.V(4).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
opCache[key] = true
return nil, false, nil
}
return nil, true, err
}
// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
func deleteEachItem(
dynamicClient *dynamic.Client,
opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
) error {
glog.V(4).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
if err != nil {
return err
}
if !listSupported {
return nil
}
apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
for _, item := range unstructuredList.Items {
if err = dynamicClient.Resource(&apiResource, namespace).Delete(item.Name, nil); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
return err
}
}
return nil
}
// deleteAllContentForGroupVersionResource will use the dynamic client to delete each resource identified in gvr.
// It returns an estimate of the time remaining before the remaing resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func deleteAllContentForGroupVersionResource(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
namespaceDeletedAt unversioned.Time,
) (int64, error) {
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)
// estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete)
estimate, err := estimateGracefulTermination(kubeClient, gvr, namespace, namespaceDeletedAt)
if err != nil {
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
// get a client for this group version...
dynamicClient, err := clientPool.ClientForGroupVersion(gvr.GroupVersion())
if err != nil {
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
// first try to delete the entire collection
deleteCollectionSupported, err := deleteCollection(dynamicClient, opCache, gvr, namespace)
if err != nil { if err != nil {
return estimate, err return estimate, err
} }
err = deleteServices(kubeClient, namespace)
if err != nil { // delete collection was not supported, so we list and delete each item...
return estimate, err if !deleteCollectionSupported {
} err = deleteEachItem(dynamicClient, opCache, gvr, namespace)
err = deleteReplicationControllers(kubeClient, namespace)
if err != nil {
return estimate, err
}
estimate, err = deletePods(kubeClient, namespace, before)
if err != nil {
return estimate, err
}
err = deleteSecrets(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteConfigMaps(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deletePersistentVolumeClaims(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteLimitRanges(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteResourceQuotas(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteEvents(kubeClient, namespace)
if err != nil {
return estimate, err
}
// If experimental mode, delete all experimental resources for the namespace.
if containsVersion(versions, "extensions/v1beta1") {
resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1")
if err != nil { if err != nil {
return estimate, err return estimate, err
} }
if containsResource(resources, "horizontalpodautoscalers") { }
err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace)
if err != nil { // verify there are no more remaining items
return estimate, err // it is not an error condition for there to be remaining items if local estimate is non-zero
} glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
} unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
if containsResource(resources, "ingresses") { if err != nil {
err = deleteIngress(kubeClient.Extensions(), namespace) glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
if err != nil { return estimate, err
return estimate, err }
} if !listSupported {
} return estimate, nil
if containsResource(resources, "daemonsets") { }
err = deleteDaemonSets(kubeClient.Extensions(), namespace) glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining - namespace: %s, gvr: %v, items: %v", namespace, gvr, len(unstructuredList.Items))
if err != nil { if len(unstructuredList.Items) != 0 && estimate == int64(0) {
return estimate, err return estimate, fmt.Errorf("unexpected items still remain in namespace: %s for gvr: %v", namespace, gvr)
}
}
if containsResource(resources, "jobs") {
err = deleteJobs(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "deployments") {
err = deleteDeployments(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "replicasets") {
err = deleteReplicaSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
} }
return estimate, nil return estimate, nil
} }
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaing resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func deleteAllContent(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
namespace string,
namespaceDeletedAt unversioned.Time,
) (int64, error) {
estimate := int64(0)
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, gvrs: %v", namespace, groupVersionResources)
// iterate over each group version, and attempt to delete all of its resources
for _, gvr := range groupVersionResources {
gvrEstimate, err := deleteAllContentForGroupVersionResource(kubeClient, clientPool, opCache, gvr, namespace, namespaceDeletedAt)
if err != nil {
return estimate, err
}
if gvrEstimate > estimate {
estimate = gvrEstimate
}
}
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v", namespace, estimate)
return estimate, nil
}
// syncNamespace orchestrates deletion of a Namespace and its associated content. // syncNamespace orchestrates deletion of a Namespace and its associated content.
func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error { func syncNamespace(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
namespace *api.Namespace,
finalizerToken api.FinalizerName,
) error {
if namespace.DeletionTimestamp == nil { if namespace.DeletionTimestamp == nil {
return nil return nil
} }
@ -211,7 +353,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
return err return err
} }
glog.V(4).Infof("Syncing namespace %s", namespace.Name) glog.V(4).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, finalizerToken)
// ensure that the status is up to date on the namespace // ensure that the status is up to date on the namespace
// if we get a not found error, we assume the namespace is truly gone // if we get a not found error, we assume the namespace is truly gone
@ -233,7 +375,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
} }
// there may still be content for us to remove // there may still be content for us to remove
estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp) estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
if err != nil { if err != nil {
return err return err
} }
@ -242,7 +384,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
} }
// we have removed content, so mark it finalized by us // we have removed content, so mark it finalized by us
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc) result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc(finalizerToken))
if err != nil { if err != nil {
// in normal practice, this should not be possible, but if a deployment is running // in normal practice, this should not be possible, but if a deployment is running
// two controllers to do namespace deletion that share a common finalizer token it's // two controllers to do namespace deletion that share a common finalizer token it's
@ -264,125 +406,75 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
return nil return nil
} }
func deleteLimitRanges(kubeClient clientset.Interface, ns string) error { // estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace
return kubeClient.Core().LimitRanges(ns).DeleteCollection(nil, api.ListOptions{}) func estimateGracefulTermination(kubeClient clientset.Interface, groupVersionResource unversioned.GroupVersionResource, ns string, namespaceDeletedAt unversioned.Time) (int64, error) {
} groupResource := groupVersionResource.GroupResource()
glog.V(4).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource)
func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error { estimate := int64(0)
return kubeClient.Core().ResourceQuotas(ns).DeleteCollection(nil, api.ListOptions{}) var err error
} switch groupResource {
case unversioned.GroupResource{Group: "", Resource: "pods"}:
func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error { estimate, err = estimateGracefulTerminationForPods(kubeClient, ns)
return kubeClient.Core().ServiceAccounts(ns).DeleteCollection(nil, api.ListOptions{}) }
}
func deleteServices(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().Services(ns).List(api.ListOptions{})
if err != nil { if err != nil {
return err return estimate, err
} }
for i := range items.Items { // determine if the estimate is greater than the deletion timestamp
err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil) duration := time.Since(namespaceDeletedAt.Time)
if err != nil && !errors.IsNotFound(err) { allowedEstimate := time.Duration(estimate) * time.Second
return err if duration >= allowedEstimate {
} estimate = int64(0)
} }
return nil return estimate, nil
} }
func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error { // estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace
return kubeClient.Core().ReplicationControllers(ns).DeleteCollection(nil, api.ListOptions{}) func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns string) (int64, error) {
} glog.V(4).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns)
estimate := int64(0)
func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) {
items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{}) items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{})
if err != nil { if err != nil {
return 0, err return estimate, err
} }
expired := unversioned.Now().After(before.Time)
var deleteOptions *api.DeleteOptions
if expired {
deleteOptions = api.NewDeleteOptions(0)
}
estimate := int64(0)
for i := range items.Items { for i := range items.Items {
// filter out terminal pods
phase := items.Items[i].Status.Phase
if api.PodSucceeded == phase || api.PodFailed == phase {
continue
}
if items.Items[i].Spec.TerminationGracePeriodSeconds != nil { if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
grace := *items.Items[i].Spec.TerminationGracePeriodSeconds grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
if grace > estimate { if grace > estimate {
estimate = grace estimate = grace
} }
} }
err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions)
if err != nil && !errors.IsNotFound(err) {
return 0, err
}
}
if expired {
estimate = 0
} }
return estimate, nil return estimate, nil
} }
func deleteEvents(kubeClient clientset.Interface, ns string) error { // ServerPreferredNamespacedGroupVersionResources uses the specified client to discover the set of preferred groupVersionResources that are namespaced
return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{}) func ServerPreferredNamespacedGroupVersionResources(discoveryClient client.DiscoveryInterface) ([]unversioned.GroupVersionResource, error) {
} results := []unversioned.GroupVersionResource{}
serverGroupList, err := discoveryClient.ServerGroups()
func deleteSecrets(kubeClient clientset.Interface, ns string) error { if err != nil {
return kubeClient.Core().Secrets(ns).DeleteCollection(nil, api.ListOptions{}) return results, err
} }
for _, apiGroup := range serverGroupList.Groups {
func deleteConfigMaps(kubeClient clientset.Interface, ns string) error { preferredVersion := apiGroup.PreferredVersion
return kubeClient.Core().ConfigMaps(ns).DeleteCollection(nil, api.ListOptions{}) apiResourceList, err := discoveryClient.ServerResourcesForGroupVersion(preferredVersion.GroupVersion)
} if err != nil {
return results, err
func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error { }
return kubeClient.Core().PersistentVolumeClaims(ns).DeleteCollection(nil, api.ListOptions{}) groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version}
} for _, apiResource := range apiResourceList.APIResources {
if !apiResource.Namespaced {
func deleteHorizontalPodAutoscalers(expClient unversionedextensions.ExtensionsInterface, ns string) error { continue
return expClient.HorizontalPodAutoscalers(ns).DeleteCollection(nil, api.ListOptions{}) }
} if strings.Contains(apiResource.Name, "/") {
continue
func deleteDaemonSets(expClient unversionedextensions.ExtensionsInterface, ns string) error { }
return expClient.DaemonSets(ns).DeleteCollection(nil, api.ListOptions{}) results = append(results, groupVersion.WithResource(apiResource.Name))
}
func deleteJobs(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.Jobs(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteDeployments(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.Deployments(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteReplicaSets(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.ReplicaSets(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteIngress(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.Ingresses(ns).DeleteCollection(nil, api.ListOptions{})
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsVersion(versions *unversioned.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
} }
} }
return false return results, nil
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsResource(resources *unversioned.APIResourceList, resourceName string) bool {
if resources == nil {
return false
}
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
} }