Updating federation namespace controller to share namespaced resources deletion code with kube namespace controller

This commit is contained in:
nikhiljindal 2016-10-31 16:43:32 -07:00
parent c1c2a12134
commit 74676f6995
7 changed files with 138 additions and 197 deletions

View File

@ -30,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apiserver/pkg/server/healthz"
utilflag "k8s.io/apiserver/pkg/util/flag"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
federationclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset"
@ -167,7 +168,7 @@ func StartControllers(s *options.CMServer, restClientCfg *restclient.Config) err
glog.Infof("Loading client config for namespace controller %q", "namespace-controller")
nsClientset := federationclientset.NewForConfigOrDie(restclient.AddUserAgent(restClientCfg, "namespace-controller"))
namespaceController := namespacecontroller.NewNamespaceController(nsClientset)
namespaceController := namespacecontroller.NewNamespaceController(nsClientset, dynamic.NewDynamicClientPool(restclient.AddUserAgent(restClientCfg, "namespace-controller")))
glog.Infof("Running namespace controller")
namespaceController.Run(wait.NeverStop)

View File

@ -24,6 +24,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
clientv1 "k8s.io/client-go/pkg/api/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
@ -37,6 +38,7 @@ import (
apiv1 "k8s.io/kubernetes/pkg/api/v1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/namespace/deletion"
"github.com/golang/glog"
)
@ -75,6 +77,9 @@ type NamespaceController struct {
deletionHelper *deletionhelper.DeletionHelper
// Helper to delete all resources in a namespace.
namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
namespaceReviewDelay time.Duration
clusterAvailableDelay time.Duration
smallDelay time.Duration
@ -82,7 +87,7 @@ type NamespaceController struct {
}
// NewNamespaceController returns a new namespace controller
func NewNamespaceController(client federationclientset.Interface) *NamespaceController {
func NewNamespaceController(client federationclientset.Interface, dynamicClientPool dynamic.ClientPool) *NamespaceController {
broadcaster := record.NewBroadcaster()
broadcaster.StartRecordingToSink(eventsink.NewFederatedEventSink(client))
recorder := broadcaster.NewRecorder(api.Scheme, clientv1.EventSource{Component: "federated-namespace-controller"})
@ -180,6 +185,11 @@ func NewNamespaceController(client federationclientset.Interface) *NamespaceCont
nc.namespaceFederatedInformer,
nc.federatedUpdater,
)
discoverResourcesFn := nc.federatedApiClient.Discovery().ServerPreferredNamespacedResources
nc.namespacedResourcesDeleter = deletion.NewNamespacedResourcesDeleter(
client.Core().Namespaces(), dynamicClientPool, nil,
discoverResourcesFn, apiv1.FinalizerKubernetes, false)
return nc
}
@ -463,11 +473,16 @@ func (nc *NamespaceController) delete(namespace *apiv1.Namespace) error {
if nc.hasFinalizerFuncInSpec(updatedNamespace, apiv1.FinalizerKubernetes) {
// Delete resources in this namespace.
updatedNamespace, err = nc.removeKubernetesFinalizer(updatedNamespace)
err = nc.namespacedResourcesDeleter.Delete(updatedNamespace.Name)
if err != nil {
return fmt.Errorf("error in deleting resources in namespace %s: %v", namespace.Name, err)
}
glog.V(2).Infof("Removed kubernetes finalizer from ns %s", namespace.Name)
// Fetch the updated Namespace.
updatedNamespace, err = nc.federatedApiClient.Core().Namespaces().Get(updatedNamespace.Name, metav1.GetOptions{})
if err != nil {
return fmt.Errorf("error in fetching updated namespace %s: %s", updatedNamespace.Name, err)
}
}
// Delete the namespace from all underlying clusters.
@ -487,44 +502,3 @@ func (nc *NamespaceController) delete(namespace *apiv1.Namespace) error {
}
return nil
}
// Ensures that all resources in this namespace are deleted and then removes the kubernetes finalizer.
func (nc *NamespaceController) removeKubernetesFinalizer(namespace *apiv1.Namespace) (*apiv1.Namespace, error) {
// Right now there are just 7 types of objects: Deployments, DaemonSets, ReplicaSet, Secret, Ingress, Events and Service.
// Temporarily these items are simply deleted one by one to squeeze this code into 1.4.
// TODO: Make it generic (like in the regular namespace controller) and parallel.
err := nc.federatedApiClient.Core().Services(namespace.Name).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete service list: %v", err)
}
err = nc.federatedApiClient.Extensions().ReplicaSets(namespace.Name).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete replicaset list from namespace: %v", err)
}
err = nc.federatedApiClient.Core().Secrets(namespace.Name).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete secret list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().Ingresses(namespace.Name).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete ingresses list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().DaemonSets(namespace.Name).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete daemonsets list from namespace: %v", err)
}
err = nc.federatedApiClient.Extensions().Deployments(namespace.Name).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete deployments list from namespace: %v", err)
}
err = nc.federatedApiClient.Core().Events(namespace.Name).DeleteCollection(&metav1.DeleteOptions{}, metav1.ListOptions{})
if err != nil {
return nil, fmt.Errorf("failed to delete events list from namespace: %v", err)
}
// Remove kube_api.FinalizerKubernetes
if len(namespace.Spec.Finalizers) != 0 {
return nc.removeFinalizerFromSpec(namespace, apiv1.FinalizerKubernetes)
}
return namespace, nil
}

View File

@ -24,6 +24,8 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/dynamic"
restclient "k8s.io/client-go/rest"
core "k8s.io/client-go/testing"
federationapi "k8s.io/kubernetes/federation/apis/federation/v1beta1"
fakefedclientset "k8s.io/kubernetes/federation/client/clientset_generated/federation_clientset/fake"
@ -31,7 +33,6 @@ import (
"k8s.io/kubernetes/federation/pkg/federation-controller/util/deletionhelper"
. "k8s.io/kubernetes/federation/pkg/federation-controller/util/test"
apiv1 "k8s.io/kubernetes/pkg/api/v1"
extensionsv1 "k8s.io/kubernetes/pkg/apis/extensions/v1beta1"
kubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
fakekubeclientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
@ -69,30 +70,8 @@ func TestNamespaceController(t *testing.T) {
RegisterFakeList("namespaces", &cluster2Client.Fake, &apiv1.NamespaceList{Items: []apiv1.Namespace{}})
cluster2CreateChan := RegisterFakeCopyOnCreate("namespaces", &cluster2Client.Fake, cluster2Watch)
RegisterFakeList("replicasets", &fakeClient.Fake, &extensionsv1.ReplicaSetList{Items: []extensionsv1.ReplicaSet{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-rs",
Namespace: ns1.Namespace,
}}}})
RegisterFakeList("secrets", &fakeClient.Fake, &apiv1.SecretList{Items: []apiv1.Secret{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-secret",
Namespace: ns1.Namespace,
}}}})
RegisterFakeList("services", &fakeClient.Fake, &apiv1.ServiceList{Items: []apiv1.Service{
{
ObjectMeta: metav1.ObjectMeta{
Name: "test-service",
Namespace: ns1.Namespace,
}}}})
nsDeleteChan := RegisterDelete(&fakeClient.Fake, "namespaces")
rsDeleteChan := RegisterDeleteCollection(&fakeClient.Fake, "replicasets")
serviceDeleteChan := RegisterDeleteCollection(&fakeClient.Fake, "services")
secretDeleteChan := RegisterDeleteCollection(&fakeClient.Fake, "secrets")
namespaceController := NewNamespaceController(fakeClient)
namespaceController := NewNamespaceController(fakeClient, dynamic.NewDynamicClientPool(&restclient.Config{}))
informerClientFactory := func(cluster *federationapi.Cluster) (kubeclientset.Interface, error) {
switch cluster.Name {
case cluster1.Name:
@ -153,10 +132,12 @@ func TestNamespaceController(t *testing.T) {
ns1.DeletionTimestamp = &metav1.Time{Time: time.Now()}
namespaceWatch.Modify(&ns1)
assert.Equal(t, ns1.Name, GetStringFromChan(nsDeleteChan))
assert.Equal(t, "all", GetStringFromChan(rsDeleteChan))
assert.Equal(t, "all", GetStringFromChan(serviceDeleteChan))
assert.Equal(t, "all", GetStringFromChan(secretDeleteChan))
// TODO: Add a test for verifying that resources in the namespace are deleted
// when the namespace is deleted.
// Need a fake dynamic client to mock list and delete actions to be able to test this.
// TODO: Add a fake dynamic client and test this.
// In the meantime, e2e test verify that the resources in a namespace are
// deleted when the namespace is deleted.
close(stop)
}

View File

@ -18,6 +18,7 @@ package deletion
import (
"fmt"
"reflect"
"sync"
"time"
@ -26,10 +27,8 @@ import (
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/v1"
// "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
"k8s.io/client-go/discovery"
@ -37,28 +36,42 @@ import (
v1clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
)
// Interface to delete a namespace with all resources in it.
type NamespacedResourcesDeleterInterface interface {
Delete(nsName string) error
}
func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface,
clientPool dynamic.ClientPool, opCache *OperationNotSupportedCache,
clientPool dynamic.ClientPool, podsGetter v1clientset.PodsGetter,
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) *NamespacedResourcesDeleter {
return &NamespacedResourcesDeleter{
nsClient: nsClient,
clientPool: clientPool,
opCache: opCache,
finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) NamespacedResourcesDeleterInterface {
d := &namespacedResourcesDeleter{
nsClient: nsClient,
clientPool: clientPool,
podsGetter: podsGetter,
opCache: &operationNotSupportedCache{
m: make(map[operationKey]bool),
},
discoverResourcesFn: discoverResourcesFn,
finalizerToken: finalizerToken,
deleteNamespaceWhenDone: deleteNamespaceWhenDone,
}
d.initOpCache()
return d
}
// NamespacedResourcesDeleter is used to delete all resources in a given namespace.
type NamespacedResourcesDeleter struct {
var _ NamespacedResourcesDeleterInterface = &namespacedResourcesDeleter{}
// namespacedResourcesDeleter is used to delete all resources in a given namespace.
type namespacedResourcesDeleter struct {
// Client to manipulate the namespace.
nsClient v1clientset.NamespaceInterface
// Dynamic client to list and delete all namespaced resources.
clientPool dynamic.ClientPool
// Interface to get PodInterface.
podsGetter v1clientset.PodsGetter
// Cache of what operations are not supported on each group version resource.
opCache *OperationNotSupportedCache
opCache *operationNotSupportedCache
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
// The finalizer token that should be removed from the namespace
// when all resources in that namespace have been deleted.
@ -81,7 +94,7 @@ type NamespacedResourcesDeleter struct {
// Returns ResourcesRemainingError if it deleted some resources but needs
// to wait for them to go away.
// Caller is expected to keep calling this until it succeeds.
func (d *NamespacedResourcesDeleter) Delete(nsName string) error {
func (d *namespacedResourcesDeleter) Delete(nsName string) error {
// Multiple controllers may edit a namespace during termination
// first get the latest state of the namespace before proceeding
// if the namespace was deleted already, don't do anything
@ -146,12 +159,46 @@ func (d *NamespacedResourcesDeleter) Delete(nsName string) error {
return nil
}
func (d *namespacedResourcesDeleter) initOpCache() {
// pre-fill opCache with the discovery info
//
// TODO(sttts): get rid of opCache and http 405 logic around it and trust discovery info
resources, err := d.discoverResourcesFn()
if err != nil {
glog.Fatalf("Failed to get supported resources: %v", err)
}
deletableGroupVersionResources := []schema.GroupVersionResource{}
for _, rl := range resources {
gv, err := schema.ParseGroupVersion(rl.GroupVersion)
if err != nil {
glog.Errorf("Failed to parse GroupVersion %q, skipping: %v", rl.GroupVersion, err)
continue
}
for _, r := range rl.APIResources {
gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: r.Name}
verbs := sets.NewString([]string(r.Verbs)...)
if !verbs.Has("delete") {
glog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr)
}
for _, op := range []operation{operationList, operationDeleteCollection} {
if !verbs.Has(string(op)) {
d.opCache.setNotSupported(operationKey{operation: op, gvr: gvr})
}
}
deletableGroupVersionResources = append(deletableGroupVersionResources, gvr)
}
}
}
// Deletes the given namespace.
func (d *NamespacedResourcesDeleter) deleteNamespace(namespace *v1.Namespace) error {
var opts *v1.DeleteOptions
func (d *namespacedResourcesDeleter) deleteNamespace(namespace *v1.Namespace) error {
var opts *metav1.DeleteOptions
uid := namespace.UID
if len(uid) > 0 {
opts = &v1.DeleteOptions{Preconditions: &v1.Preconditions{UID: &uid}}
opts = &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &uid}}
}
err := d.nsClient.Delete(namespace.Name, opts)
if err != nil && !errors.IsNotFound(err) {
@ -170,39 +217,39 @@ func (e *ResourcesRemainingError) Error() string {
}
// operation is used for caching if an operation is supported on a dynamic client.
type Operation string
type operation string
const (
OperationDeleteCollection Operation = "deleteCollection"
OperationList Operation = "list"
operationDeleteCollection operation = "deletecollection"
operationList operation = "list"
// assume a default estimate for finalizers to complete when found on items pending deletion.
finalizerEstimateSeconds int64 = int64(15)
)
// operationKey is an entry in a cache.
type OperationKey struct {
Operation Operation
Gvr schema.GroupVersionResource
type operationKey struct {
operation operation
gvr schema.GroupVersionResource
}
// operationNotSupportedCache is a simple cache to remember if an operation is not supported for a resource.
// if the operationKey maps to true, it means the operation is not supported.
type OperationNotSupportedCache struct {
type operationNotSupportedCache struct {
lock sync.RWMutex
M map[OperationKey]bool
m map[operationKey]bool
}
// isSupported returns true if the operation is supported
func (o *OperationNotSupportedCache) isSupported(key OperationKey) bool {
func (o *operationNotSupportedCache) isSupported(key operationKey) bool {
o.lock.RLock()
defer o.lock.RUnlock()
return !o.M[key]
return !o.m[key]
}
func (o *OperationNotSupportedCache) SetNotSupported(key OperationKey) {
func (o *operationNotSupportedCache) setNotSupported(key operationKey) {
o.lock.Lock()
defer o.lock.Unlock()
o.M[key] = true
o.m[key] = true
}
// updateNamespaceFunc is a function that makes an update to a namespace
@ -211,7 +258,7 @@ type updateNamespaceFunc func(namespace *v1.Namespace) (*v1.Namespace, error)
// retryOnConflictError retries the specified fn if there was a conflict error
// it will return an error if the UID for an object changes across retry operations.
// TODO RetryOnConflict should be a generic concept in client code
func (d *NamespacedResourcesDeleter) retryOnConflictError(namespace *v1.Namespace, fn updateNamespaceFunc) (result *v1.Namespace, err error) {
func (d *namespacedResourcesDeleter) retryOnConflictError(namespace *v1.Namespace, fn updateNamespaceFunc) (result *v1.Namespace, err error) {
latestNamespace := namespace
for {
result, err = fn(latestNamespace)
@ -222,7 +269,6 @@ func (d *NamespacedResourcesDeleter) retryOnConflictError(namespace *v1.Namespac
return nil, err
}
prevNamespace := latestNamespace
// latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name, metav1.GetOptions{})
latestNamespace, err = d.nsClient.Get(latestNamespace.Name, metav1.GetOptions{})
if err != nil {
return nil, err
@ -234,7 +280,7 @@ func (d *NamespacedResourcesDeleter) retryOnConflictError(namespace *v1.Namespac
}
// updateNamespaceStatusFunc will verify that the status of the namespace is correct
func (d *NamespacedResourcesDeleter) updateNamespaceStatusFunc(namespace *v1.Namespace) (*v1.Namespace, error) {
func (d *namespacedResourcesDeleter) updateNamespaceStatusFunc(namespace *v1.Namespace) (*v1.Namespace, error) {
if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == v1.NamespaceTerminating {
return namespace, nil
}
@ -251,7 +297,7 @@ func finalized(namespace *v1.Namespace) bool {
}
// finalizeNamespace removes the specified finalizerToken and finalizes the namespace
func (d *NamespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace) (*v1.Namespace, error) {
func (d *namespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace) (*v1.Namespace, error) {
namespaceFinalize := v1.Namespace{}
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
namespaceFinalize.Spec = namespace.Spec
@ -278,12 +324,12 @@ func (d *NamespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace)
// deleteCollection is a helper function that will delete the collection of resources
// it returns true if the operation was supported on the server.
// it returns an error if the operation was supported on the server but was unable to complete.
func (d *NamespacedResourcesDeleter) deleteCollection(
func (d *namespacedResourcesDeleter) deleteCollection(
dynamicClient *dynamic.Client, gvr schema.GroupVersionResource,
namespace string) (bool, error) {
glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
key := OperationKey{Operation: OperationDeleteCollection, Gvr: gvr}
key := operationKey{operation: operationDeleteCollection, gvr: gvr}
if !d.opCache.isSupported(key) {
glog.V(5).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
return false, nil
@ -309,7 +355,7 @@ func (d *NamespacedResourcesDeleter) deleteCollection(
// remember next time that this resource does not support delete collection...
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
glog.V(5).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
d.opCache.SetNotSupported(key)
d.opCache.setNotSupported(key)
return false, nil
}
@ -322,11 +368,11 @@ func (d *NamespacedResourcesDeleter) deleteCollection(
// the list of items in the collection (if found)
// a boolean if the operation is supported
// an error if the operation is supported but could not be completed.
func (d *NamespacedResourcesDeleter) listCollection(
func (d *namespacedResourcesDeleter) listCollection(
dynamicClient *dynamic.Client, gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
key := OperationKey{Operation: OperationList, Gvr: gvr}
key := operationKey{operation: operationList, gvr: gvr}
if !d.opCache.isSupported(key) {
glog.V(5).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
return nil, false, nil
@ -350,7 +396,7 @@ func (d *NamespacedResourcesDeleter) listCollection(
// remember next time that this resource does not support delete collection...
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
glog.V(5).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
d.opCache.SetNotSupported(key)
d.opCache.setNotSupported(key)
return nil, false, nil
}
@ -358,7 +404,7 @@ func (d *NamespacedResourcesDeleter) listCollection(
}
// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
func (d *NamespacedResourcesDeleter) deleteEachItem(
func (d *namespacedResourcesDeleter) deleteEachItem(
dynamicClient *dynamic.Client, gvr schema.GroupVersionResource, namespace string) error {
glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)
@ -381,7 +427,7 @@ func (d *NamespacedResourcesDeleter) deleteEachItem(
// deleteAllContentForGroupVersionResource will use the dynamic client to delete each resource identified in gvr.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func (d *NamespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
func (d *namespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
gvr schema.GroupVersionResource, namespace string,
namespaceDeletedAt metav1.Time) (int64, error) {
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)
@ -444,7 +490,7 @@ func (d *NamespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaining resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func (d *NamespacedResourcesDeleter) deleteAllContent(
func (d *namespacedResourcesDeleter) deleteAllContent(
namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
estimate := int64(0)
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s", namespace)
@ -472,18 +518,14 @@ func (d *NamespacedResourcesDeleter) deleteAllContent(
}
// estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace
func (d *NamespacedResourcesDeleter) estimateGracefulTermination(gvr schema.GroupVersionResource, ns string, namespaceDeletedAt metav1.Time) (int64, error) {
func (d *namespacedResourcesDeleter) estimateGracefulTermination(gvr schema.GroupVersionResource, ns string, namespaceDeletedAt metav1.Time) (int64, error) {
groupResource := gvr.GroupResource()
glog.V(5).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource)
estimate := int64(0)
var err error
switch groupResource {
case schema.GroupResource{Group: "", Resource: "pods"}:
dynamicClient, err := d.clientPool.ClientForGroupVersionResource(gvr)
if err != nil {
return estimate, fmt.Errorf("error in creating dynamic client for resource: %s", gvr)
}
estimate, err = d.estimateGracefulTerminationForPods(dynamicClient, ns)
estimate, err = d.estimateGracefulTerminationForPods(ns)
}
if err != nil {
return estimate, err
@ -498,27 +540,19 @@ func (d *NamespacedResourcesDeleter) estimateGracefulTermination(gvr schema.Grou
}
// estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace
func (d *NamespacedResourcesDeleter) estimateGracefulTerminationForPods(dynamicClient *dynamic.Client, ns string) (int64, error) {
func (d *namespacedResourcesDeleter) estimateGracefulTerminationForPods(ns string) (int64, error) {
glog.V(5).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns)
estimate := int64(0)
resource := &metav1.APIResource{
Namespaced: true,
Kind: "Pod",
podsGetter := d.podsGetter
if podsGetter == nil || reflect.ValueOf(podsGetter).IsNil() {
return estimate, fmt.Errorf("unexpected: podsGetter is nil. Cannot estimate grace period seconds for pods")
}
listResponse, err := dynamicClient.Resource(resource, ns).List(&metav1.ListOptions{})
items, err := podsGetter.Pods(ns).List(metav1.ListOptions{})
if err != nil {
return estimate, err
}
items, ok := listResponse.(*unstructured.UnstructuredList)
if !ok {
return estimate, fmt.Errorf("unexpected: expected type unstructured.UnstructuredList, got: %#v", listResponse)
}
for i := range items.Items {
item := items.Items[i]
pod, err := unstructuredToPod(item)
if err != nil {
return estimate, fmt.Errorf("unexpected: expected type v1.Pod, got: %#v", item)
}
pod := items.Items[i]
// filter out terminal pods
phase := pod.Status.Phase
if v1.PodSucceeded == phase || v1.PodFailed == phase {
@ -533,13 +567,3 @@ func (d *NamespacedResourcesDeleter) estimateGracefulTerminationForPods(dynamicC
}
return estimate, nil
}
func unstructuredToPod(obj *unstructured.Unstructured) (*v1.Pod, error) {
json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
if err != nil {
return nil, err
}
pod := new(v1.Pod)
err = runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, pod)
return pod, err
}

View File

@ -65,7 +65,7 @@ func TestFinalizeNamespaceFunc(t *testing.T) {
Finalizers: []v1.FinalizerName{"kubernetes", "other"},
},
}
d := NamespacedResourcesDeleter{
d := namespacedResourcesDeleter{
nsClient: mockClient.Core().Namespaces(),
finalizerToken: v1.FinalizerKubernetes,
}
@ -130,11 +130,6 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
dynamicClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String())
}
// One additional GET for listing pods (to estimate graceful deletion).
urlPath := path.Join([]string{
dynamic.LegacyAPIPathResolverFunc(schema.GroupVersionKind{Group: "", Version: "v1"}),
"", "v1", "namespaces", namespaceName}...)
dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
scenarios := map[string]struct {
testNamespace *v1.Namespace
@ -147,6 +142,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
kubeClientActionSet: sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"create", "namespaces", "finalize"}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"),
),
dynamicClientActionSet: dynamicClientActionSet,
@ -181,7 +177,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
fn := func() ([]*metav1.APIResourceList, error) {
return resources, nil
}
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), clientPool, &OperationNotSupportedCache{M: make(map[OperationKey]bool)}, fn, v1.FinalizerKubernetes, true)
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), clientPool, mockClient.Core(), fn, v1.FinalizerKubernetes, true)
err := d.Delete(testInput.testNamespace.Name)
if err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
@ -220,7 +216,7 @@ func TestRetryOnConflictError(t *testing.T) {
return namespace, nil
}
namespace := &v1.Namespace{}
d := NamespacedResourcesDeleter{
d := namespacedResourcesDeleter{
nsClient: mockClient.Core().Namespaces(),
}
_, err := d.retryOnConflictError(namespace, retryOnce)
@ -257,9 +253,8 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
fn := func() ([]*metav1.APIResourceList, error) {
return testResources(), nil
}
//err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, v1.FinalizerKubernetes)
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), nil,
&OperationNotSupportedCache{M: make(map[OperationKey]bool)}, fn, v1.FinalizerKubernetes, true)
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), nil, mockClient.Core(),
fn, v1.FinalizerKubernetes, true)
err := d.Delete(testNamespace.Name)
if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err)

View File

@ -21,9 +21,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
"k8s.io/client-go/dynamic"
@ -62,12 +60,10 @@ type NamespaceController struct {
queue workqueue.RateLimitingInterface
// function to list of preferred resources for namespace deletion
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache *deletion.OperationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
finalizerToken v1.FinalizerName
// helper to delete all resources in the namespace when the namespace is deleted.
namespacedResourcesDeleter *deletion.NamespacedResourcesDeleter
namespacedResourcesDeleter deletion.NamespacedResourcesDeleterInterface
}
// NewNamespaceController creates a new NamespaceController
@ -78,52 +74,14 @@ func NewNamespaceController(
resyncPeriod time.Duration,
finalizerToken v1.FinalizerName) *NamespaceController {
opCache := &deletion.OperationNotSupportedCache{
M: make(map[deletion.OperationKey]bool),
}
// pre-fill opCache with the discovery info
//
// TODO(sttts): get rid of opCache and http 405 logic around it and trust discovery info
resources, err := discoverResourcesFn()
if err != nil {
glog.Fatalf("Failed to get supported resources: %v", err)
}
deletableGroupVersionResources := []schema.GroupVersionResource{}
for _, rl := range resources {
gv, err := schema.ParseGroupVersion(rl.GroupVersion)
if err != nil {
glog.Errorf("Failed to parse GroupVersion %q, skipping: %v", rl.GroupVersion, err)
continue
}
for _, r := range rl.APIResources {
gvr := schema.GroupVersionResource{Group: gv.Group, Version: gv.Version, Resource: r.Name}
verbs := sets.NewString([]string(r.Verbs)...)
if !verbs.Has("delete") {
glog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr)
}
for _, op := range []deletion.Operation{deletion.OperationList, deletion.OperationDeleteCollection} {
if !verbs.Has(string(op)) {
opCache.SetNotSupported(deletion.OperationKey{Operation: op, Gvr: gvr})
}
}
deletableGroupVersionResources = append(deletableGroupVersionResources, gvr)
}
}
// create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{
kubeClient: kubeClient,
clientPool: clientPool,
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
discoverResourcesFn: discoverResourcesFn,
opCache: opCache,
finalizerToken: finalizerToken,
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.Core().Namespaces(), clientPool, opCache, discoverResourcesFn, finalizerToken, true),
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.Core().Namespaces(), clientPool, kubeClient.Core(), discoverResourcesFn, finalizerToken, true),
}
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {

View File

@ -21,6 +21,7 @@ import (
"k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/pkg/api"
restclient "k8s.io/client-go/rest"
)
@ -70,6 +71,13 @@ func NewClientPool(config *restclient.Config, mapper meta.RESTMapper, apiPathRes
}
}
// Instantiates a new dynamic client pool with the given config.
func NewDynamicClientPool(cfg *restclient.Config) ClientPool {
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper := api.Registry.RESTMapper()
return NewClientPool(cfg, restMapper, LegacyAPIPathResolverFunc)
}
// ClientForGroupVersionResource uses the provided RESTMapper to identify the appropriate resource. Resource may
// be empty. If no matching kind is found the underlying client for that group is still returned.
func (c *clientPoolImpl) ClientForGroupVersionResource(resource schema.GroupVersionResource) (*Client, error) {