mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 04:33:26 +00:00
Refactorying namespace deletion code to enable reuse with federation namespace controller
This commit is contained in:
parent
a461eab321
commit
c1c2a12134
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package namespace
|
package deletion
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -26,69 +26,195 @@ import (
|
|||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||||
|
"k8s.io/apimachinery/pkg/runtime"
|
||||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||||
"k8s.io/apimachinery/pkg/util/sets"
|
"k8s.io/apimachinery/pkg/util/sets"
|
||||||
|
"k8s.io/kubernetes/pkg/api"
|
||||||
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
|
// "k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/client-go/discovery"
|
"k8s.io/client-go/discovery"
|
||||||
"k8s.io/client-go/dynamic"
|
"k8s.io/client-go/dynamic"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
v1clientset "k8s.io/kubernetes/pkg/client/clientset_generated/clientset/typed/core/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// contentRemainingError is used to inform the caller that content is not fully removed from the namespace
|
func NewNamespacedResourcesDeleter(nsClient v1clientset.NamespaceInterface,
|
||||||
type contentRemainingError struct {
|
clientPool dynamic.ClientPool, opCache *OperationNotSupportedCache,
|
||||||
|
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
|
||||||
|
finalizerToken v1.FinalizerName, deleteNamespaceWhenDone bool) *NamespacedResourcesDeleter {
|
||||||
|
return &NamespacedResourcesDeleter{
|
||||||
|
nsClient: nsClient,
|
||||||
|
clientPool: clientPool,
|
||||||
|
opCache: opCache,
|
||||||
|
discoverResourcesFn: discoverResourcesFn,
|
||||||
|
finalizerToken: finalizerToken,
|
||||||
|
deleteNamespaceWhenDone: deleteNamespaceWhenDone,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// NamespacedResourcesDeleter is used to delete all resources in a given namespace.
|
||||||
|
type NamespacedResourcesDeleter struct {
|
||||||
|
// Client to manipulate the namespace.
|
||||||
|
nsClient v1clientset.NamespaceInterface
|
||||||
|
// Dynamic client to list and delete all namespaced resources.
|
||||||
|
clientPool dynamic.ClientPool
|
||||||
|
// Cache of what operations are not supported on each group version resource.
|
||||||
|
opCache *OperationNotSupportedCache
|
||||||
|
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
|
||||||
|
// The finalizer token that should be removed from the namespace
|
||||||
|
// when all resources in that namespace have been deleted.
|
||||||
|
finalizerToken v1.FinalizerName
|
||||||
|
// Also delete the namespace when all resources in the namespace have been deleted.
|
||||||
|
deleteNamespaceWhenDone bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete deletes all resources in the given namespace.
|
||||||
|
// Before deleting resources:
|
||||||
|
// * It ensures that deletion timestamp is set on the
|
||||||
|
// namespace (does nothing if deletion timestamp is missing).
|
||||||
|
// * Verifies that the namespace is in the "terminating" phase
|
||||||
|
// (updates the namespace phase if it is not yet marked terminating)
|
||||||
|
// After deleting the resources:
|
||||||
|
// * It removes finalizer token from the given namespace.
|
||||||
|
// * Deletes the namespace if deleteNamespaceWhenDone is true.
|
||||||
|
//
|
||||||
|
// Returns an error if any of those steps fail.
|
||||||
|
// Returns ResourcesRemainingError if it deleted some resources but needs
|
||||||
|
// to wait for them to go away.
|
||||||
|
// Caller is expected to keep calling this until it succeeds.
|
||||||
|
func (d *NamespacedResourcesDeleter) Delete(nsName string) error {
|
||||||
|
// Multiple controllers may edit a namespace during termination
|
||||||
|
// first get the latest state of the namespace before proceeding
|
||||||
|
// if the namespace was deleted already, don't do anything
|
||||||
|
namespace, err := d.nsClient.Get(nsName, metav1.GetOptions{})
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if namespace.DeletionTimestamp == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
glog.V(5).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, d.finalizerToken)
|
||||||
|
|
||||||
|
// ensure that the status is up to date on the namespace
|
||||||
|
// if we get a not found error, we assume the namespace is truly gone
|
||||||
|
namespace, err = d.retryOnConflictError(namespace, d.updateNamespaceStatusFunc)
|
||||||
|
if err != nil {
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// the latest view of the namespace asserts that namespace is no longer deleting..
|
||||||
|
if namespace.DeletionTimestamp.IsZero() {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Delete the namespace if it is already finalized.
|
||||||
|
if d.deleteNamespaceWhenDone && finalized(namespace) {
|
||||||
|
return d.deleteNamespace(namespace)
|
||||||
|
}
|
||||||
|
|
||||||
|
// there may still be content for us to remove
|
||||||
|
estimate, err := d.deleteAllContent(namespace.Name, *namespace.DeletionTimestamp)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if estimate > 0 {
|
||||||
|
return &ResourcesRemainingError{estimate}
|
||||||
|
}
|
||||||
|
|
||||||
|
// we have removed content, so mark it finalized by us
|
||||||
|
namespace, err = d.retryOnConflictError(namespace, d.finalizeNamespace)
|
||||||
|
if err != nil {
|
||||||
|
// in normal practice, this should not be possible, but if a deployment is running
|
||||||
|
// two controllers to do namespace deletion that share a common finalizer token it's
|
||||||
|
// possible that a not found could occur since the other controller would have finished the delete.
|
||||||
|
if errors.IsNotFound(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we can delete now.
|
||||||
|
if d.deleteNamespaceWhenDone && finalized(namespace) {
|
||||||
|
return d.deleteNamespace(namespace)
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Deletes the given namespace.
|
||||||
|
func (d *NamespacedResourcesDeleter) deleteNamespace(namespace *v1.Namespace) error {
|
||||||
|
var opts *v1.DeleteOptions
|
||||||
|
uid := namespace.UID
|
||||||
|
if len(uid) > 0 {
|
||||||
|
opts = &v1.DeleteOptions{Preconditions: &v1.Preconditions{UID: &uid}}
|
||||||
|
}
|
||||||
|
err := d.nsClient.Delete(namespace.Name, opts)
|
||||||
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// ResourcesRemainingError is used to inform the caller that all resources are not yet fully removed from the namespace.
|
||||||
|
type ResourcesRemainingError struct {
|
||||||
Estimate int64
|
Estimate int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (e *contentRemainingError) Error() string {
|
func (e *ResourcesRemainingError) Error() string {
|
||||||
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
|
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
|
||||||
}
|
}
|
||||||
|
|
||||||
// operation is used for caching if an operation is supported on a dynamic client.
|
// operation is used for caching if an operation is supported on a dynamic client.
|
||||||
type operation string
|
type Operation string
|
||||||
|
|
||||||
const (
|
const (
|
||||||
operationDeleteCollection operation = "deleteCollection"
|
OperationDeleteCollection Operation = "deleteCollection"
|
||||||
operationList operation = "list"
|
OperationList Operation = "list"
|
||||||
// assume a default estimate for finalizers to complete when found on items pending deletion.
|
// assume a default estimate for finalizers to complete when found on items pending deletion.
|
||||||
finalizerEstimateSeconds int64 = int64(15)
|
finalizerEstimateSeconds int64 = int64(15)
|
||||||
)
|
)
|
||||||
|
|
||||||
// operationKey is an entry in a cache.
|
// operationKey is an entry in a cache.
|
||||||
type operationKey struct {
|
type OperationKey struct {
|
||||||
op operation
|
Operation Operation
|
||||||
gvr schema.GroupVersionResource
|
Gvr schema.GroupVersionResource
|
||||||
}
|
}
|
||||||
|
|
||||||
// operationNotSupportedCache is a simple cache to remember if an operation is not supported for a resource.
|
// operationNotSupportedCache is a simple cache to remember if an operation is not supported for a resource.
|
||||||
// if the operationKey maps to true, it means the operation is not supported.
|
// if the operationKey maps to true, it means the operation is not supported.
|
||||||
type operationNotSupportedCache struct {
|
type OperationNotSupportedCache struct {
|
||||||
lock sync.RWMutex
|
lock sync.RWMutex
|
||||||
m map[operationKey]bool
|
M map[OperationKey]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// isSupported returns true if the operation is supported
|
// isSupported returns true if the operation is supported
|
||||||
func (o *operationNotSupportedCache) isSupported(key operationKey) bool {
|
func (o *OperationNotSupportedCache) isSupported(key OperationKey) bool {
|
||||||
o.lock.RLock()
|
o.lock.RLock()
|
||||||
defer o.lock.RUnlock()
|
defer o.lock.RUnlock()
|
||||||
return !o.m[key]
|
return !o.M[key]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (o *operationNotSupportedCache) setNotSupported(key operationKey) {
|
func (o *OperationNotSupportedCache) SetNotSupported(key OperationKey) {
|
||||||
o.lock.Lock()
|
o.lock.Lock()
|
||||||
defer o.lock.Unlock()
|
defer o.lock.Unlock()
|
||||||
o.m[key] = true
|
o.M[key] = true
|
||||||
}
|
}
|
||||||
|
|
||||||
// updateNamespaceFunc is a function that makes an update to a namespace
|
// updateNamespaceFunc is a function that makes an update to a namespace
|
||||||
type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *v1.Namespace) (*v1.Namespace, error)
|
type updateNamespaceFunc func(namespace *v1.Namespace) (*v1.Namespace, error)
|
||||||
|
|
||||||
// retryOnConflictError retries the specified fn if there was a conflict error
|
// retryOnConflictError retries the specified fn if there was a conflict error
|
||||||
// it will return an error if the UID for an object changes across retry operations.
|
// it will return an error if the UID for an object changes across retry operations.
|
||||||
// TODO RetryOnConflict should be a generic concept in client code
|
// TODO RetryOnConflict should be a generic concept in client code
|
||||||
func retryOnConflictError(kubeClient clientset.Interface, namespace *v1.Namespace, fn updateNamespaceFunc) (result *v1.Namespace, err error) {
|
func (d *NamespacedResourcesDeleter) retryOnConflictError(namespace *v1.Namespace, fn updateNamespaceFunc) (result *v1.Namespace, err error) {
|
||||||
latestNamespace := namespace
|
latestNamespace := namespace
|
||||||
for {
|
for {
|
||||||
result, err = fn(kubeClient, latestNamespace)
|
result, err = fn(latestNamespace)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
@ -96,7 +222,8 @@ func retryOnConflictError(kubeClient clientset.Interface, namespace *v1.Namespac
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
prevNamespace := latestNamespace
|
prevNamespace := latestNamespace
|
||||||
latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name, metav1.GetOptions{})
|
// latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name, metav1.GetOptions{})
|
||||||
|
latestNamespace, err = d.nsClient.Get(latestNamespace.Name, metav1.GetOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -107,7 +234,7 @@ func retryOnConflictError(kubeClient clientset.Interface, namespace *v1.Namespac
|
|||||||
}
|
}
|
||||||
|
|
||||||
// updateNamespaceStatusFunc will verify that the status of the namespace is correct
|
// updateNamespaceStatusFunc will verify that the status of the namespace is correct
|
||||||
func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *v1.Namespace) (*v1.Namespace, error) {
|
func (d *NamespacedResourcesDeleter) updateNamespaceStatusFunc(namespace *v1.Namespace) (*v1.Namespace, error) {
|
||||||
if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == v1.NamespaceTerminating {
|
if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == v1.NamespaceTerminating {
|
||||||
return namespace, nil
|
return namespace, nil
|
||||||
}
|
}
|
||||||
@ -115,7 +242,7 @@ func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *v1.Nam
|
|||||||
newNamespace.ObjectMeta = namespace.ObjectMeta
|
newNamespace.ObjectMeta = namespace.ObjectMeta
|
||||||
newNamespace.Status = namespace.Status
|
newNamespace.Status = namespace.Status
|
||||||
newNamespace.Status.Phase = v1.NamespaceTerminating
|
newNamespace.Status.Phase = v1.NamespaceTerminating
|
||||||
return kubeClient.Core().Namespaces().UpdateStatus(&newNamespace)
|
return d.nsClient.UpdateStatus(&newNamespace)
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalized returns true if the namespace.Spec.Finalizers is an empty list
|
// finalized returns true if the namespace.Spec.Finalizers is an empty list
|
||||||
@ -123,21 +250,14 @@ func finalized(namespace *v1.Namespace) bool {
|
|||||||
return len(namespace.Spec.Finalizers) == 0
|
return len(namespace.Spec.Finalizers) == 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// finalizeNamespaceFunc returns a function that knows how to finalize a namespace for specified token.
|
|
||||||
func finalizeNamespaceFunc(finalizerToken v1.FinalizerName) updateNamespaceFunc {
|
|
||||||
return func(kubeClient clientset.Interface, namespace *v1.Namespace) (*v1.Namespace, error) {
|
|
||||||
return finalizeNamespace(kubeClient, namespace, finalizerToken)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// finalizeNamespace removes the specified finalizerToken and finalizes the namespace
|
// finalizeNamespace removes the specified finalizerToken and finalizes the namespace
|
||||||
func finalizeNamespace(kubeClient clientset.Interface, namespace *v1.Namespace, finalizerToken v1.FinalizerName) (*v1.Namespace, error) {
|
func (d *NamespacedResourcesDeleter) finalizeNamespace(namespace *v1.Namespace) (*v1.Namespace, error) {
|
||||||
namespaceFinalize := v1.Namespace{}
|
namespaceFinalize := v1.Namespace{}
|
||||||
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
|
namespaceFinalize.ObjectMeta = namespace.ObjectMeta
|
||||||
namespaceFinalize.Spec = namespace.Spec
|
namespaceFinalize.Spec = namespace.Spec
|
||||||
finalizerSet := sets.NewString()
|
finalizerSet := sets.NewString()
|
||||||
for i := range namespace.Spec.Finalizers {
|
for i := range namespace.Spec.Finalizers {
|
||||||
if namespace.Spec.Finalizers[i] != finalizerToken {
|
if namespace.Spec.Finalizers[i] != d.finalizerToken {
|
||||||
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
|
finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -145,7 +265,7 @@ func finalizeNamespace(kubeClient clientset.Interface, namespace *v1.Namespace,
|
|||||||
for _, value := range finalizerSet.List() {
|
for _, value := range finalizerSet.List() {
|
||||||
namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, v1.FinalizerName(value))
|
namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, v1.FinalizerName(value))
|
||||||
}
|
}
|
||||||
namespace, err := kubeClient.Core().Namespaces().Finalize(&namespaceFinalize)
|
namespace, err := d.nsClient.Finalize(&namespaceFinalize)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// it was removed already, so life is good
|
// it was removed already, so life is good
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
@ -158,16 +278,13 @@ func finalizeNamespace(kubeClient clientset.Interface, namespace *v1.Namespace,
|
|||||||
// deleteCollection is a helper function that will delete the collection of resources
|
// deleteCollection is a helper function that will delete the collection of resources
|
||||||
// it returns true if the operation was supported on the server.
|
// it returns true if the operation was supported on the server.
|
||||||
// it returns an error if the operation was supported on the server but was unable to complete.
|
// it returns an error if the operation was supported on the server but was unable to complete.
|
||||||
func deleteCollection(
|
func (d *NamespacedResourcesDeleter) deleteCollection(
|
||||||
dynamicClient *dynamic.Client,
|
dynamicClient *dynamic.Client, gvr schema.GroupVersionResource,
|
||||||
opCache *operationNotSupportedCache,
|
namespace string) (bool, error) {
|
||||||
gvr schema.GroupVersionResource,
|
|
||||||
namespace string,
|
|
||||||
) (bool, error) {
|
|
||||||
glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
|
|
||||||
key := operationKey{op: operationDeleteCollection, gvr: gvr}
|
key := OperationKey{Operation: OperationDeleteCollection, Gvr: gvr}
|
||||||
if !opCache.isSupported(key) {
|
if !d.opCache.isSupported(key) {
|
||||||
glog.V(5).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
@ -192,7 +309,7 @@ func deleteCollection(
|
|||||||
// remember next time that this resource does not support delete collection...
|
// remember next time that this resource does not support delete collection...
|
||||||
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
|
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
|
||||||
glog.V(5).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
opCache.setNotSupported(key)
|
d.opCache.SetNotSupported(key)
|
||||||
return false, nil
|
return false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -205,16 +322,12 @@ func deleteCollection(
|
|||||||
// the list of items in the collection (if found)
|
// the list of items in the collection (if found)
|
||||||
// a boolean if the operation is supported
|
// a boolean if the operation is supported
|
||||||
// an error if the operation is supported but could not be completed.
|
// an error if the operation is supported but could not be completed.
|
||||||
func listCollection(
|
func (d *NamespacedResourcesDeleter) listCollection(
|
||||||
dynamicClient *dynamic.Client,
|
dynamicClient *dynamic.Client, gvr schema.GroupVersionResource, namespace string) (*unstructured.UnstructuredList, bool, error) {
|
||||||
opCache *operationNotSupportedCache,
|
|
||||||
gvr schema.GroupVersionResource,
|
|
||||||
namespace string,
|
|
||||||
) (*unstructured.UnstructuredList, bool, error) {
|
|
||||||
glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
|
|
||||||
key := operationKey{op: operationList, gvr: gvr}
|
key := OperationKey{Operation: OperationList, Gvr: gvr}
|
||||||
if !opCache.isSupported(key) {
|
if !d.opCache.isSupported(key) {
|
||||||
glog.V(5).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
@ -237,7 +350,7 @@ func listCollection(
|
|||||||
// remember next time that this resource does not support delete collection...
|
// remember next time that this resource does not support delete collection...
|
||||||
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
|
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
|
||||||
glog.V(5).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
opCache.setNotSupported(key)
|
d.opCache.SetNotSupported(key)
|
||||||
return nil, false, nil
|
return nil, false, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -245,15 +358,11 @@ func listCollection(
|
|||||||
}
|
}
|
||||||
|
|
||||||
// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
|
// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
|
||||||
func deleteEachItem(
|
func (d *NamespacedResourcesDeleter) deleteEachItem(
|
||||||
dynamicClient *dynamic.Client,
|
dynamicClient *dynamic.Client, gvr schema.GroupVersionResource, namespace string) error {
|
||||||
opCache *operationNotSupportedCache,
|
|
||||||
gvr schema.GroupVersionResource,
|
|
||||||
namespace string,
|
|
||||||
) error {
|
|
||||||
glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
|
|
||||||
unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
|
unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -272,18 +381,13 @@ func deleteEachItem(
|
|||||||
// deleteAllContentForGroupVersionResource will use the dynamic client to delete each resource identified in gvr.
|
// deleteAllContentForGroupVersionResource will use the dynamic client to delete each resource identified in gvr.
|
||||||
// It returns an estimate of the time remaining before the remaining resources are deleted.
|
// It returns an estimate of the time remaining before the remaining resources are deleted.
|
||||||
// If estimate > 0, not all resources are guaranteed to be gone.
|
// If estimate > 0, not all resources are guaranteed to be gone.
|
||||||
func deleteAllContentForGroupVersionResource(
|
func (d *NamespacedResourcesDeleter) deleteAllContentForGroupVersionResource(
|
||||||
kubeClient clientset.Interface,
|
gvr schema.GroupVersionResource, namespace string,
|
||||||
clientPool dynamic.ClientPool,
|
namespaceDeletedAt metav1.Time) (int64, error) {
|
||||||
opCache *operationNotSupportedCache,
|
|
||||||
gvr schema.GroupVersionResource,
|
|
||||||
namespace string,
|
|
||||||
namespaceDeletedAt metav1.Time,
|
|
||||||
) (int64, error) {
|
|
||||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)
|
||||||
|
|
||||||
// estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete)
|
// estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete)
|
||||||
estimate, err := estimateGracefulTermination(kubeClient, gvr, namespace, namespaceDeletedAt)
|
estimate, err := d.estimateGracefulTermination(gvr, namespace, namespaceDeletedAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
|
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
|
||||||
return estimate, err
|
return estimate, err
|
||||||
@ -291,21 +395,21 @@ func deleteAllContentForGroupVersionResource(
|
|||||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
|
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
|
||||||
|
|
||||||
// get a client for this group version...
|
// get a client for this group version...
|
||||||
dynamicClient, err := clientPool.ClientForGroupVersionResource(gvr)
|
dynamicClient, err := d.clientPool.ClientForGroupVersionResource(gvr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
|
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
|
||||||
return estimate, err
|
return estimate, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// first try to delete the entire collection
|
// first try to delete the entire collection
|
||||||
deleteCollectionSupported, err := deleteCollection(dynamicClient, opCache, gvr, namespace)
|
deleteCollectionSupported, err := d.deleteCollection(dynamicClient, gvr, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return estimate, err
|
return estimate, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete collection was not supported, so we list and delete each item...
|
// delete collection was not supported, so we list and delete each item...
|
||||||
if !deleteCollectionSupported {
|
if !deleteCollectionSupported {
|
||||||
err = deleteEachItem(dynamicClient, opCache, gvr, namespace)
|
err = d.deleteEachItem(dynamicClient, gvr, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return estimate, err
|
return estimate, err
|
||||||
}
|
}
|
||||||
@ -314,7 +418,7 @@ func deleteAllContentForGroupVersionResource(
|
|||||||
// verify there are no more remaining items
|
// verify there are no more remaining items
|
||||||
// it is not an error condition for there to be remaining items if local estimate is non-zero
|
// it is not an error condition for there to be remaining items if local estimate is non-zero
|
||||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
|
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
|
||||||
unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
|
unstructuredList, listSupported, err := d.listCollection(dynamicClient, gvr, namespace)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
|
glog.V(5).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
|
||||||
return estimate, err
|
return estimate, err
|
||||||
@ -340,18 +444,22 @@ func deleteAllContentForGroupVersionResource(
|
|||||||
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
|
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
|
||||||
// It returns an estimate of the time remaining before the remaining resources are deleted.
|
// It returns an estimate of the time remaining before the remaining resources are deleted.
|
||||||
// If estimate > 0, not all resources are guaranteed to be gone.
|
// If estimate > 0, not all resources are guaranteed to be gone.
|
||||||
func deleteAllContent(
|
func (d *NamespacedResourcesDeleter) deleteAllContent(
|
||||||
kubeClient clientset.Interface,
|
namespace string, namespaceDeletedAt metav1.Time) (int64, error) {
|
||||||
clientPool dynamic.ClientPool,
|
|
||||||
opCache *operationNotSupportedCache,
|
|
||||||
groupVersionResources map[schema.GroupVersionResource]struct{},
|
|
||||||
namespace string,
|
|
||||||
namespaceDeletedAt metav1.Time,
|
|
||||||
) (int64, error) {
|
|
||||||
estimate := int64(0)
|
estimate := int64(0)
|
||||||
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, gvrs: %v", namespace, groupVersionResources)
|
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s", namespace)
|
||||||
|
resources, err := d.discoverResourcesFn()
|
||||||
|
if err != nil {
|
||||||
|
return estimate, err
|
||||||
|
}
|
||||||
|
// TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter
|
||||||
|
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
|
||||||
|
groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
|
||||||
|
if err != nil {
|
||||||
|
return estimate, err
|
||||||
|
}
|
||||||
for gvr := range groupVersionResources {
|
for gvr := range groupVersionResources {
|
||||||
gvrEstimate, err := deleteAllContentForGroupVersionResource(kubeClient, clientPool, opCache, gvr, namespace, namespaceDeletedAt)
|
gvrEstimate, err := d.deleteAllContentForGroupVersionResource(gvr, namespace, namespaceDeletedAt)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return estimate, err
|
return estimate, err
|
||||||
}
|
}
|
||||||
@ -363,112 +471,19 @@ func deleteAllContent(
|
|||||||
return estimate, nil
|
return estimate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// syncNamespace orchestrates deletion of a Namespace and its associated content.
|
|
||||||
func syncNamespace(
|
|
||||||
kubeClient clientset.Interface,
|
|
||||||
clientPool dynamic.ClientPool,
|
|
||||||
opCache *operationNotSupportedCache,
|
|
||||||
discoverResourcesFn func() ([]*metav1.APIResourceList, error),
|
|
||||||
namespace *v1.Namespace,
|
|
||||||
finalizerToken v1.FinalizerName,
|
|
||||||
) error {
|
|
||||||
if namespace.DeletionTimestamp == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// multiple controllers may edit a namespace during termination
|
|
||||||
// first get the latest state of the namespace before proceeding
|
|
||||||
// if the namespace was deleted already, don't do anything
|
|
||||||
namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name, metav1.GetOptions{})
|
|
||||||
if err != nil {
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
glog.V(5).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, finalizerToken)
|
|
||||||
|
|
||||||
// ensure that the status is up to date on the namespace
|
|
||||||
// if we get a not found error, we assume the namespace is truly gone
|
|
||||||
namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc)
|
|
||||||
if err != nil {
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// the latest view of the namespace asserts that namespace is no longer deleting..
|
|
||||||
if namespace.DeletionTimestamp.IsZero() {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// if the namespace is already finalized, delete it
|
|
||||||
if finalized(namespace) {
|
|
||||||
var opts *metav1.DeleteOptions
|
|
||||||
uid := namespace.UID
|
|
||||||
if len(uid) > 0 {
|
|
||||||
opts = &metav1.DeleteOptions{Preconditions: &metav1.Preconditions{UID: &uid}}
|
|
||||||
}
|
|
||||||
err = kubeClient.Core().Namespaces().Delete(namespace.Name, opts)
|
|
||||||
if err != nil && !errors.IsNotFound(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// there may still be content for us to remove
|
|
||||||
resources, err := discoverResourcesFn()
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
// TODO(sttts): get rid of opCache and pass the verbs (especially "deletecollection") down into the deleter
|
|
||||||
deletableResources := discovery.FilteredBy(discovery.SupportsAllVerbs{Verbs: []string{"delete"}}, resources)
|
|
||||||
groupVersionResources, err := discovery.GroupVersionResources(deletableResources)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
if estimate > 0 {
|
|
||||||
return &contentRemainingError{estimate}
|
|
||||||
}
|
|
||||||
|
|
||||||
// we have removed content, so mark it finalized by us
|
|
||||||
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc(finalizerToken))
|
|
||||||
if err != nil {
|
|
||||||
// in normal practice, this should not be possible, but if a deployment is running
|
|
||||||
// two controllers to do namespace deletion that share a common finalizer token it's
|
|
||||||
// possible that a not found could occur since the other controller would have finished the delete.
|
|
||||||
if errors.IsNotFound(err) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// now check if all finalizers have reported that we delete now
|
|
||||||
if finalized(result) {
|
|
||||||
err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil)
|
|
||||||
if err != nil && !errors.IsNotFound(err) {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace
|
// estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace
|
||||||
func estimateGracefulTermination(kubeClient clientset.Interface, groupVersionResource schema.GroupVersionResource, ns string, namespaceDeletedAt metav1.Time) (int64, error) {
|
func (d *NamespacedResourcesDeleter) estimateGracefulTermination(gvr schema.GroupVersionResource, ns string, namespaceDeletedAt metav1.Time) (int64, error) {
|
||||||
groupResource := groupVersionResource.GroupResource()
|
groupResource := gvr.GroupResource()
|
||||||
glog.V(5).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource)
|
glog.V(5).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource)
|
||||||
estimate := int64(0)
|
estimate := int64(0)
|
||||||
var err error
|
var err error
|
||||||
switch groupResource {
|
switch groupResource {
|
||||||
case schema.GroupResource{Group: "", Resource: "pods"}:
|
case schema.GroupResource{Group: "", Resource: "pods"}:
|
||||||
estimate, err = estimateGracefulTerminationForPods(kubeClient, ns)
|
dynamicClient, err := d.clientPool.ClientForGroupVersionResource(gvr)
|
||||||
|
if err != nil {
|
||||||
|
return estimate, fmt.Errorf("error in creating dynamic client for resource: %s", gvr)
|
||||||
|
}
|
||||||
|
estimate, err = d.estimateGracefulTerminationForPods(dynamicClient, ns)
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return estimate, err
|
return estimate, err
|
||||||
@ -483,21 +498,34 @@ func estimateGracefulTermination(kubeClient clientset.Interface, groupVersionRes
|
|||||||
}
|
}
|
||||||
|
|
||||||
// estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace
|
// estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace
|
||||||
func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns string) (int64, error) {
|
func (d *NamespacedResourcesDeleter) estimateGracefulTerminationForPods(dynamicClient *dynamic.Client, ns string) (int64, error) {
|
||||||
glog.V(5).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns)
|
glog.V(5).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns)
|
||||||
estimate := int64(0)
|
estimate := int64(0)
|
||||||
items, err := kubeClient.Core().Pods(ns).List(metav1.ListOptions{})
|
resource := &metav1.APIResource{
|
||||||
|
Namespaced: true,
|
||||||
|
Kind: "Pod",
|
||||||
|
}
|
||||||
|
listResponse, err := dynamicClient.Resource(resource, ns).List(&metav1.ListOptions{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return estimate, err
|
return estimate, err
|
||||||
}
|
}
|
||||||
|
items, ok := listResponse.(*unstructured.UnstructuredList)
|
||||||
|
if !ok {
|
||||||
|
return estimate, fmt.Errorf("unexpected: expected type unstructured.UnstructuredList, got: %#v", listResponse)
|
||||||
|
}
|
||||||
for i := range items.Items {
|
for i := range items.Items {
|
||||||
|
item := items.Items[i]
|
||||||
|
pod, err := unstructuredToPod(item)
|
||||||
|
if err != nil {
|
||||||
|
return estimate, fmt.Errorf("unexpected: expected type v1.Pod, got: %#v", item)
|
||||||
|
}
|
||||||
// filter out terminal pods
|
// filter out terminal pods
|
||||||
phase := items.Items[i].Status.Phase
|
phase := pod.Status.Phase
|
||||||
if v1.PodSucceeded == phase || v1.PodFailed == phase {
|
if v1.PodSucceeded == phase || v1.PodFailed == phase {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
|
if pod.Spec.TerminationGracePeriodSeconds != nil {
|
||||||
grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
|
grace := *pod.Spec.TerminationGracePeriodSeconds
|
||||||
if grace > estimate {
|
if grace > estimate {
|
||||||
estimate = grace
|
estimate = grace
|
||||||
}
|
}
|
||||||
@ -505,3 +533,13 @@ func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns strin
|
|||||||
}
|
}
|
||||||
return estimate, nil
|
return estimate, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func unstructuredToPod(obj *unstructured.Unstructured) (*v1.Pod, error) {
|
||||||
|
json, err := runtime.Encode(unstructured.UnstructuredJSONScheme, obj)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
pod := new(v1.Pod)
|
||||||
|
err = runtime.DecodeInto(api.Codecs.UniversalDecoder(), json, pod)
|
||||||
|
return pod, err
|
||||||
|
}
|
@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
|
|||||||
limitations under the License.
|
limitations under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
package namespace
|
package deletion
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
@ -36,7 +36,6 @@ import (
|
|||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/kubernetes/pkg/api"
|
"k8s.io/kubernetes/pkg/api"
|
||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset/fake"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -66,7 +65,11 @@ func TestFinalizeNamespaceFunc(t *testing.T) {
|
|||||||
Finalizers: []v1.FinalizerName{"kubernetes", "other"},
|
Finalizers: []v1.FinalizerName{"kubernetes", "other"},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
finalizeNamespace(mockClient, testNamespace, v1.FinalizerKubernetes)
|
d := NamespacedResourcesDeleter{
|
||||||
|
nsClient: mockClient.Core().Namespaces(),
|
||||||
|
finalizerToken: v1.FinalizerKubernetes,
|
||||||
|
}
|
||||||
|
d.finalizeNamespace(testNamespace)
|
||||||
actions := mockClient.Actions()
|
actions := mockClient.Actions()
|
||||||
if len(actions) != 1 {
|
if len(actions) != 1 {
|
||||||
t.Errorf("Expected 1 mock client action, but got %v", len(actions))
|
t.Errorf("Expected 1 mock client action, but got %v", len(actions))
|
||||||
@ -127,6 +130,11 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
|
|||||||
dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
|
dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
|
||||||
dynamicClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String())
|
dynamicClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String())
|
||||||
}
|
}
|
||||||
|
// One additional GET for listing pods (to estimate graceful deletion).
|
||||||
|
urlPath := path.Join([]string{
|
||||||
|
dynamic.LegacyAPIPathResolverFunc(schema.GroupVersionKind{Group: "", Version: "v1"}),
|
||||||
|
"", "v1", "namespaces", namespaceName}...)
|
||||||
|
dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
|
||||||
|
|
||||||
scenarios := map[string]struct {
|
scenarios := map[string]struct {
|
||||||
testNamespace *v1.Namespace
|
testNamespace *v1.Namespace
|
||||||
@ -139,7 +147,6 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
|
|||||||
kubeClientActionSet: sets.NewString(
|
kubeClientActionSet: sets.NewString(
|
||||||
strings.Join([]string{"get", "namespaces", ""}, "-"),
|
strings.Join([]string{"get", "namespaces", ""}, "-"),
|
||||||
strings.Join([]string{"create", "namespaces", "finalize"}, "-"),
|
strings.Join([]string{"create", "namespaces", "finalize"}, "-"),
|
||||||
strings.Join([]string{"list", "pods", ""}, "-"),
|
|
||||||
strings.Join([]string{"delete", "namespaces", ""}, "-"),
|
strings.Join([]string{"delete", "namespaces", ""}, "-"),
|
||||||
),
|
),
|
||||||
dynamicClientActionSet: dynamicClientActionSet,
|
dynamicClientActionSet: dynamicClientActionSet,
|
||||||
@ -174,8 +181,8 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
|
|||||||
fn := func() ([]*metav1.APIResourceList, error) {
|
fn := func() ([]*metav1.APIResourceList, error) {
|
||||||
return resources, nil
|
return resources, nil
|
||||||
}
|
}
|
||||||
|
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), clientPool, &OperationNotSupportedCache{M: make(map[OperationKey]bool)}, fn, v1.FinalizerKubernetes, true)
|
||||||
err := syncNamespace(mockClient, clientPool, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testInput.testNamespace, v1.FinalizerKubernetes)
|
err := d.Delete(testInput.testNamespace.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
|
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
|
||||||
}
|
}
|
||||||
@ -205,7 +212,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *metav1.APIVersio
|
|||||||
func TestRetryOnConflictError(t *testing.T) {
|
func TestRetryOnConflictError(t *testing.T) {
|
||||||
mockClient := &fake.Clientset{}
|
mockClient := &fake.Clientset{}
|
||||||
numTries := 0
|
numTries := 0
|
||||||
retryOnce := func(kubeClient clientset.Interface, namespace *v1.Namespace) (*v1.Namespace, error) {
|
retryOnce := func(namespace *v1.Namespace) (*v1.Namespace, error) {
|
||||||
numTries++
|
numTries++
|
||||||
if numTries <= 1 {
|
if numTries <= 1 {
|
||||||
return namespace, errors.NewConflict(api.Resource("namespaces"), namespace.Name, fmt.Errorf("ERROR!"))
|
return namespace, errors.NewConflict(api.Resource("namespaces"), namespace.Name, fmt.Errorf("ERROR!"))
|
||||||
@ -213,7 +220,10 @@ func TestRetryOnConflictError(t *testing.T) {
|
|||||||
return namespace, nil
|
return namespace, nil
|
||||||
}
|
}
|
||||||
namespace := &v1.Namespace{}
|
namespace := &v1.Namespace{}
|
||||||
_, err := retryOnConflictError(mockClient, namespace, retryOnce)
|
d := NamespacedResourcesDeleter{
|
||||||
|
nsClient: mockClient.Core().Namespaces(),
|
||||||
|
}
|
||||||
|
_, err := d.retryOnConflictError(namespace, retryOnce)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error %v", err)
|
t.Errorf("Unexpected error %v", err)
|
||||||
}
|
}
|
||||||
@ -247,12 +257,19 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
|
|||||||
fn := func() ([]*metav1.APIResourceList, error) {
|
fn := func() ([]*metav1.APIResourceList, error) {
|
||||||
return testResources(), nil
|
return testResources(), nil
|
||||||
}
|
}
|
||||||
err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, v1.FinalizerKubernetes)
|
//err := syncNamespace(mockClient, nil, &operationNotSupportedCache{m: make(map[operationKey]bool)}, fn, testNamespace, v1.FinalizerKubernetes)
|
||||||
|
d := NewNamespacedResourcesDeleter(mockClient.Core().Namespaces(), nil,
|
||||||
|
&OperationNotSupportedCache{M: make(map[OperationKey]bool)}, fn, v1.FinalizerKubernetes, true)
|
||||||
|
err := d.Delete(testNamespace.Name)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("Unexpected error when synching namespace %v", err)
|
t.Errorf("Unexpected error when synching namespace %v", err)
|
||||||
}
|
}
|
||||||
if len(mockClient.Actions()) != 0 {
|
if len(mockClient.Actions()) != 1 {
|
||||||
t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions())
|
t.Errorf("Expected only one action from controller, but got: %d %v", len(mockClient.Actions()), mockClient.Actions())
|
||||||
|
}
|
||||||
|
action := mockClient.Actions()[0]
|
||||||
|
if !action.Matches("get", "namespaces") {
|
||||||
|
t.Errorf("Expected get namespaces, got: %v", action)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -32,6 +32,7 @@ import (
|
|||||||
"k8s.io/kubernetes/pkg/api/v1"
|
"k8s.io/kubernetes/pkg/api/v1"
|
||||||
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
"k8s.io/kubernetes/pkg/client/clientset_generated/clientset"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
|
"k8s.io/kubernetes/pkg/controller/namespace/deletion"
|
||||||
"k8s.io/kubernetes/pkg/util/metrics"
|
"k8s.io/kubernetes/pkg/util/metrics"
|
||||||
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
@ -62,9 +63,11 @@ type NamespaceController struct {
|
|||||||
// function to list of preferred resources for namespace deletion
|
// function to list of preferred resources for namespace deletion
|
||||||
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
|
discoverResourcesFn func() ([]*metav1.APIResourceList, error)
|
||||||
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
|
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
|
||||||
opCache *operationNotSupportedCache
|
opCache *deletion.OperationNotSupportedCache
|
||||||
// finalizerToken is the finalizer token managed by this controller
|
// finalizerToken is the finalizer token managed by this controller
|
||||||
finalizerToken v1.FinalizerName
|
finalizerToken v1.FinalizerName
|
||||||
|
// helper to delete all resources in the namespace when the namespace is deleted.
|
||||||
|
namespacedResourcesDeleter *deletion.NamespacedResourcesDeleter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewNamespaceController creates a new NamespaceController
|
// NewNamespaceController creates a new NamespaceController
|
||||||
@ -75,8 +78,8 @@ func NewNamespaceController(
|
|||||||
resyncPeriod time.Duration,
|
resyncPeriod time.Duration,
|
||||||
finalizerToken v1.FinalizerName) *NamespaceController {
|
finalizerToken v1.FinalizerName) *NamespaceController {
|
||||||
|
|
||||||
opCache := &operationNotSupportedCache{
|
opCache := &deletion.OperationNotSupportedCache{
|
||||||
m: make(map[operationKey]bool),
|
M: make(map[deletion.OperationKey]bool),
|
||||||
}
|
}
|
||||||
|
|
||||||
// pre-fill opCache with the discovery info
|
// pre-fill opCache with the discovery info
|
||||||
@ -102,9 +105,9 @@ func NewNamespaceController(
|
|||||||
glog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr)
|
glog.V(6).Infof("Skipping resource %v because it cannot be deleted.", gvr)
|
||||||
}
|
}
|
||||||
|
|
||||||
for _, op := range []operation{operationList, operationDeleteCollection} {
|
for _, op := range []deletion.Operation{deletion.OperationList, deletion.OperationDeleteCollection} {
|
||||||
if !verbs.Has(string(op)) {
|
if !verbs.Has(string(op)) {
|
||||||
opCache.setNotSupported(operationKey{op: op, gvr: gvr})
|
opCache.SetNotSupported(deletion.OperationKey{Operation: op, Gvr: gvr})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -114,12 +117,13 @@ func NewNamespaceController(
|
|||||||
|
|
||||||
// create the controller so we can inject the enqueue function
|
// create the controller so we can inject the enqueue function
|
||||||
namespaceController := &NamespaceController{
|
namespaceController := &NamespaceController{
|
||||||
kubeClient: kubeClient,
|
kubeClient: kubeClient,
|
||||||
clientPool: clientPool,
|
clientPool: clientPool,
|
||||||
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
|
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "namespace"),
|
||||||
discoverResourcesFn: discoverResourcesFn,
|
discoverResourcesFn: discoverResourcesFn,
|
||||||
opCache: opCache,
|
opCache: opCache,
|
||||||
finalizerToken: finalizerToken,
|
finalizerToken: finalizerToken,
|
||||||
|
namespacedResourcesDeleter: deletion.NewNamespacedResourcesDeleter(kubeClient.Core().Namespaces(), clientPool, opCache, discoverResourcesFn, finalizerToken, true),
|
||||||
}
|
}
|
||||||
|
|
||||||
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
if kubeClient != nil && kubeClient.Core().RESTClient().GetRateLimiter() != nil {
|
||||||
@ -187,7 +191,7 @@ func (nm *NamespaceController) worker() {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
if estimate, ok := err.(*contentRemainingError); ok {
|
if estimate, ok := err.(*deletion.ResourcesRemainingError); ok {
|
||||||
t := estimate.Estimate/2 + 1
|
t := estimate.Estimate/2 + 1
|
||||||
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
|
glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t)
|
||||||
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
|
nm.queue.AddAfter(key, time.Duration(t)*time.Second)
|
||||||
@ -224,7 +228,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
namespace := obj.(*v1.Namespace)
|
namespace := obj.(*v1.Namespace)
|
||||||
return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.discoverResourcesFn, namespace, nm.finalizerToken)
|
return nm.namespacedResourcesDeleter.Delete(namespace.Name)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Run starts observing the system with the specified number of workers.
|
// Run starts observing the system with the specified number of workers.
|
||||||
|
1001
test/test_owners.csv
1001
test/test_owners.csv
File diff suppressed because it is too large
Load Diff
Loading…
Reference in New Issue
Block a user