Merge pull request #21400 from derekwaynecarr/namespace_deletion_discovery

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-03-03 21:33:01 -08:00
commit 258eac505f
7 changed files with 623 additions and 249 deletions

View File

@ -40,6 +40,7 @@ import (
"k8s.io/kubernetes/pkg/client/leaderelection" "k8s.io/kubernetes/pkg/client/leaderelection"
"k8s.io/kubernetes/pkg/client/record" "k8s.io/kubernetes/pkg/client/record"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
"k8s.io/kubernetes/pkg/cloudprovider" "k8s.io/kubernetes/pkg/cloudprovider"
@ -266,7 +267,14 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig
glog.Fatalf("Failed to get supported resources from server: %v", err) glog.Fatalf("Failed to get supported resources from server: %v", err)
} }
namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod.Duration) // Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery())
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop)
groupVersion := "extensions/v1beta1" groupVersion := "extensions/v1beta1"

View File

@ -32,6 +32,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/restclient" "k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
client "k8s.io/kubernetes/pkg/client/unversioned" client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/client/unversioned/clientcmd" "k8s.io/kubernetes/pkg/client/unversioned/clientcmd"
clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api" clientcmdapi "k8s.io/kubernetes/pkg/client/unversioned/clientcmd/api"
@ -214,7 +215,14 @@ func (s *CMServer) Run(_ []string) error {
glog.Fatalf("Failed to get supported resources from server: %v", err) glog.Fatalf("Failed to get supported resources from server: %v", err)
} }
namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod.Duration) // Find the list of namespaced resources via discovery that the namespace controller must manage
namespaceKubeClient := clientset.NewForConfigOrDie(restclient.AddUserAgent(kubeconfig, "namespace-controller"))
namespaceClientPool := dynamic.NewClientPool(restclient.AddUserAgent(kubeconfig, "namespace-controller"), dynamic.LegacyAPIPathResolverFunc)
groupVersionResources, err := namespacecontroller.ServerPreferredNamespacedGroupVersionResources(namespaceKubeClient.Discovery())
if err != nil {
glog.Fatalf("Failed to get supported resources from server: %v", err)
}
namespaceController := namespacecontroller.NewNamespaceController(namespaceKubeClient, namespaceClientPool, groupVersionResources, s.NamespaceSyncPeriod.Duration, api.FinalizerKubernetes)
go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop)
groupVersion := "extensions/v1beta1" groupVersion := "extensions/v1beta1"

View File

@ -0,0 +1,85 @@
/*
Copyright 2016 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package dynamic
import (
"sync"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/restclient"
)
// ClientPool manages a pool of dynamic clients.
type ClientPool interface {
// ClientForGroupVersion returns a client configured for the specified groupVersion.
ClientForGroupVersion(groupVersion unversioned.GroupVersion) (*Client, error)
}
// APIPathResolverFunc knows how to convert a groupVersion to its API path.
type APIPathResolverFunc func(groupVersion unversioned.GroupVersion) string
// LegacyAPIPathResolverFunc can resolve paths properly with the legacy API.
func LegacyAPIPathResolverFunc(groupVersion unversioned.GroupVersion) string {
if len(groupVersion.Group) == 0 {
return "/api"
}
return "/apis"
}
// clientPoolImpl implements Factory
type clientPoolImpl struct {
lock sync.RWMutex
config *restclient.Config
clients map[unversioned.GroupVersion]*Client
apiPathResolverFunc APIPathResolverFunc
}
// NewClientPool returns a ClientPool from the specified config
func NewClientPool(config *restclient.Config, apiPathResolverFunc APIPathResolverFunc) ClientPool {
return &clientPoolImpl{
config: config,
clients: map[unversioned.GroupVersion]*Client{},
apiPathResolverFunc: apiPathResolverFunc,
}
}
// ClientForGroupVersion returns a client for the specified groupVersion, creates one if none exists
func (c *clientPoolImpl) ClientForGroupVersion(groupVersion unversioned.GroupVersion) (*Client, error) {
c.lock.Lock()
defer c.lock.Unlock()
// do we have a client already configured?
if existingClient, found := c.clients[groupVersion]; found {
return existingClient, nil
}
// avoid changing the original config
confCopy := *c.config
conf := &confCopy
// we need to set the api path based on group version, if no group, default to legacy path
conf.APIPath = c.apiPathResolverFunc(groupVersion)
// we need to make a client
conf.GroupVersion = &groupVersion
dynamicClient, err := NewClient(conf)
if err != nil {
return nil, err
}
c.clients[groupVersion] = dynamicClient
return dynamicClient, nil
}

View File

@ -23,6 +23,7 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/client/cache" "k8s.io/kubernetes/pkg/client/cache"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/controller/framework"
"k8s.io/kubernetes/pkg/runtime" "k8s.io/kubernetes/pkg/runtime"
@ -38,23 +39,37 @@ import (
type NamespaceController struct { type NamespaceController struct {
// client that purges namespace content, must have list/delete privileges on all content // client that purges namespace content, must have list/delete privileges on all content
kubeClient clientset.Interface kubeClient clientset.Interface
// clientPool manages a pool of dynamic clients
clientPool dynamic.ClientPool
// store that holds the namespaces // store that holds the namespaces
store cache.Store store cache.Store
// controller that observes the namespaces // controller that observes the namespaces
controller *framework.Controller controller *framework.Controller
// namespaces that have been queued up for processing by workers // namespaces that have been queued up for processing by workers
queue *workqueue.Type queue *workqueue.Type
// list of versions to process // list of preferred group versions and their corresponding resource set for namespace deletion
versions *unversioned.APIVersions groupVersionResources []unversioned.GroupVersionResource
// opCache is a cache to remember if a particular operation is not supported to aid dynamic client.
opCache operationNotSupportedCache
// finalizerToken is the finalizer token managed by this controller
finalizerToken api.FinalizerName
} }
// NewNamespaceController creates a new NamespaceController // NewNamespaceController creates a new NamespaceController
func NewNamespaceController(kubeClient clientset.Interface, versions *unversioned.APIVersions, resyncPeriod time.Duration) *NamespaceController { func NewNamespaceController(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
groupVersionResources []unversioned.GroupVersionResource,
resyncPeriod time.Duration,
finalizerToken api.FinalizerName) *NamespaceController {
// create the controller so we can inject the enqueue function // create the controller so we can inject the enqueue function
namespaceController := &NamespaceController{ namespaceController := &NamespaceController{
kubeClient: kubeClient, kubeClient: kubeClient,
versions: versions, clientPool: clientPool,
queue: workqueue.New(), queue: workqueue.New(),
groupVersionResources: groupVersionResources,
opCache: operationNotSupportedCache{},
finalizerToken: finalizerToken,
} }
// configure the backing store/controller // configure the backing store/controller
@ -144,7 +159,7 @@ func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) {
return err return err
} }
namespace := obj.(*api.Namespace) namespace := obj.(*api.Namespace)
return syncNamespace(nm.kubeClient, nm.versions, namespace) return syncNamespace(nm.kubeClient, nm.clientPool, nm.opCache, nm.groupVersionResources, namespace, nm.finalizerToken)
} }
// Run starts observing the system with the specified number of workers. // Run starts observing the system with the specified number of workers.

View File

@ -18,7 +18,11 @@ package namespace
import ( import (
"fmt" "fmt"
"net/http"
"net/http/httptest"
"path"
"strings" "strings"
"sync"
"testing" "testing"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
@ -26,7 +30,9 @@ import (
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
"k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake" "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset/fake"
"k8s.io/kubernetes/pkg/client/restclient"
"k8s.io/kubernetes/pkg/client/testing/core" "k8s.io/kubernetes/pkg/client/testing/core"
"k8s.io/kubernetes/pkg/client/typed/dynamic"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
) )
@ -56,7 +62,7 @@ func TestFinalizeNamespaceFunc(t *testing.T) {
Finalizers: []api.FinalizerName{"kubernetes", "other"}, Finalizers: []api.FinalizerName{"kubernetes", "other"},
}, },
} }
finalizeNamespaceFunc(mockClient, testNamespace) finalizeNamespace(mockClient, testNamespace, api.FinalizerKubernetes)
actions := mockClient.Actions() actions := mockClient.Actions()
if len(actions) != 1 { if len(actions) != 1 {
t.Errorf("Expected 1 mock client action, but got %v", len(actions)) t.Errorf("Expected 1 mock client action, but got %v", len(actions))
@ -75,9 +81,10 @@ func TestFinalizeNamespaceFunc(t *testing.T) {
func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIVersions) { func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIVersions) {
now := unversioned.Now() now := unversioned.Now()
namespaceName := "test"
testNamespacePendingFinalize := &api.Namespace{ testNamespacePendingFinalize := &api.Namespace{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "test", Name: namespaceName,
ResourceVersion: "1", ResourceVersion: "1",
DeletionTimestamp: &now, DeletionTimestamp: &now,
}, },
@ -90,7 +97,7 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
} }
testNamespaceFinalizeComplete := &api.Namespace{ testNamespaceFinalizeComplete := &api.Namespace{
ObjectMeta: api.ObjectMeta{ ObjectMeta: api.ObjectMeta{
Name: "test", Name: namespaceName,
ResourceVersion: "1", ResourceVersion: "1",
DeletionTimestamp: &now, DeletionTimestamp: &now,
}, },
@ -100,78 +107,77 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV
}, },
} }
// TODO: Reuse the constants for all these strings from testclient // when doing a delete all of content, we will do a GET of a collection, and DELETE of a collection by default
pendingActionSet := sets.NewString( dynamicClientActionSet := sets.NewString()
strings.Join([]string{"get", "namespaces", ""}, "-"), groupVersionResources := testGroupVersionResources()
strings.Join([]string{"delete-collection", "replicationcontrollers", ""}, "-"), for _, groupVersionResource := range groupVersionResources {
strings.Join([]string{"list", "services", ""}, "-"), urlPath := path.Join([]string{
strings.Join([]string{"list", "pods", ""}, "-"), dynamic.LegacyAPIPathResolverFunc(groupVersionResource.GroupVersion()),
strings.Join([]string{"delete-collection", "resourcequotas", ""}, "-"), groupVersionResource.Group,
strings.Join([]string{"delete-collection", "secrets", ""}, "-"), groupVersionResource.Version,
strings.Join([]string{"delete-collection", "configmaps", ""}, "-"), "namespaces",
strings.Join([]string{"delete-collection", "limitranges", ""}, "-"), namespaceName,
strings.Join([]string{"delete-collection", "events", ""}, "-"), groupVersionResource.Resource,
strings.Join([]string{"delete-collection", "serviceaccounts", ""}, "-"), }...)
strings.Join([]string{"delete-collection", "persistentvolumeclaims", ""}, "-"), dynamicClientActionSet.Insert((&fakeAction{method: "GET", path: urlPath}).String())
strings.Join([]string{"create", "namespaces", "finalize"}, "-"), dynamicClientActionSet.Insert((&fakeAction{method: "DELETE", path: urlPath}).String())
)
if containsVersion(versions, "extensions/v1beta1") {
pendingActionSet.Insert(
strings.Join([]string{"delete-collection", "daemonsets", ""}, "-"),
strings.Join([]string{"delete-collection", "deployments", ""}, "-"),
strings.Join([]string{"delete-collection", "replicasets", ""}, "-"),
strings.Join([]string{"delete-collection", "jobs", ""}, "-"),
strings.Join([]string{"delete-collection", "horizontalpodautoscalers", ""}, "-"),
strings.Join([]string{"delete-collection", "ingresses", ""}, "-"),
strings.Join([]string{"get", "resource", ""}, "-"),
)
} }
scenarios := map[string]struct { scenarios := map[string]struct {
testNamespace *api.Namespace testNamespace *api.Namespace
expectedActionSet sets.String kubeClientActionSet sets.String
dynamicClientActionSet sets.String
}{ }{
"pending-finalize": { "pending-finalize": {
testNamespace: testNamespacePendingFinalize, testNamespace: testNamespacePendingFinalize,
expectedActionSet: pendingActionSet, kubeClientActionSet: sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"list", "pods", ""}, "-"),
strings.Join([]string{"create", "namespaces", "finalize"}, "-"),
),
dynamicClientActionSet: dynamicClientActionSet,
}, },
"complete-finalize": { "complete-finalize": {
testNamespace: testNamespaceFinalizeComplete, testNamespace: testNamespaceFinalizeComplete,
expectedActionSet: sets.NewString( kubeClientActionSet: sets.NewString(
strings.Join([]string{"get", "namespaces", ""}, "-"), strings.Join([]string{"get", "namespaces", ""}, "-"),
strings.Join([]string{"delete", "namespaces", ""}, "-"), strings.Join([]string{"delete", "namespaces", ""}, "-"),
), ),
dynamicClientActionSet: sets.NewString(),
}, },
} }
for scenario, testInput := range scenarios { for scenario, testInput := range scenarios {
testHandler := &fakeActionHandler{statusCode: 200}
srv, clientConfig := testServerAndClientConfig(testHandler.ServeHTTP)
defer srv.Close()
mockClient := fake.NewSimpleClientset(testInput.testNamespace) mockClient := fake.NewSimpleClientset(testInput.testNamespace)
if containsVersion(versions, "extensions/v1beta1") { clientPool := dynamic.NewClientPool(clientConfig, dynamic.LegacyAPIPathResolverFunc)
resources := []unversioned.APIResource{}
for _, resource := range []string{"daemonsets", "deployments", "replicasets", "jobs", "horizontalpodautoscalers", "ingresses"} { err := syncNamespace(mockClient, clientPool, operationNotSupportedCache{}, groupVersionResources, testInput.testNamespace, api.FinalizerKubernetes)
resources = append(resources, unversioned.APIResource{Name: resource})
}
mockClient.Resources = map[string]*unversioned.APIResourceList{
"extensions/v1beta1": {
GroupVersion: "extensions/v1beta1",
APIResources: resources,
},
}
}
err := syncNamespace(mockClient, versions, testInput.testNamespace)
if err != nil { if err != nil {
t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err) t.Errorf("scenario %s - Unexpected error when synching namespace %v", scenario, err)
} }
// validate traffic from kube client
actionSet := sets.NewString() actionSet := sets.NewString()
for _, action := range mockClient.Actions() { for _, action := range mockClient.Actions() {
actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-")) actionSet.Insert(strings.Join([]string{action.GetVerb(), action.GetResource(), action.GetSubresource()}, "-"))
} }
if !actionSet.HasAll(testInput.expectedActionSet.List()...) { if !actionSet.Equal(testInput.kubeClientActionSet) {
t.Errorf("scenario %s - Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, testInput.expectedActionSet, actionSet, testInput.expectedActionSet.Difference(actionSet)) t.Errorf("scenario %s - mock client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.kubeClientActionSet, actionSet, testInput.kubeClientActionSet.Difference(actionSet))
} }
if !testInput.expectedActionSet.HasAll(actionSet.List()...) {
t.Errorf("scenario %s - Expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario, testInput.expectedActionSet, actionSet, actionSet.Difference(testInput.expectedActionSet)) // validate traffic from dynamic client
actionSet = sets.NewString()
for _, action := range testHandler.actions {
actionSet.Insert(action.String())
}
if !actionSet.Equal(testInput.dynamicClientActionSet) {
t.Errorf("scenario %s - dynamic client expected actions:\n%v\n but got:\n%v\nDifference:\n%v", scenario,
testInput.dynamicClientActionSet, actionSet, testInput.dynamicClientActionSet.Difference(actionSet))
} }
} }
} }
@ -218,7 +224,7 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
Phase: api.NamespaceActive, Phase: api.NamespaceActive,
}, },
} }
err := syncNamespace(mockClient, &unversioned.APIVersions{}, testNamespace) err := syncNamespace(mockClient, nil, operationNotSupportedCache{}, testGroupVersionResources(), testNamespace, api.FinalizerKubernetes)
if err != nil { if err != nil {
t.Errorf("Unexpected error when synching namespace %v", err) t.Errorf("Unexpected error when synching namespace %v", err)
} }
@ -226,3 +232,51 @@ func TestSyncNamespaceThatIsActive(t *testing.T) {
t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions()) t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions())
} }
} }
// testServerAndClientConfig returns a server that listens and a config that can reference it
func testServerAndClientConfig(handler func(http.ResponseWriter, *http.Request)) (*httptest.Server, *restclient.Config) {
srv := httptest.NewServer(http.HandlerFunc(handler))
config := &restclient.Config{
Host: srv.URL,
}
return srv, config
}
// fakeAction records information about requests to aid in testing.
type fakeAction struct {
method string
path string
}
// String returns method=path to aid in testing
func (f *fakeAction) String() string {
return strings.Join([]string{f.method, f.path}, "=")
}
// fakeActionHandler holds a list of fakeActions received
type fakeActionHandler struct {
// statusCode returned by this handler
statusCode int
lock sync.Mutex
actions []fakeAction
}
// ServeHTTP logs the action that occurred and always returns the associated status code
func (f *fakeActionHandler) ServeHTTP(response http.ResponseWriter, request *http.Request) {
f.lock.Lock()
defer f.lock.Unlock()
f.actions = append(f.actions, fakeAction{method: request.Method, path: request.URL.Path})
response.WriteHeader(f.statusCode)
response.Write([]byte("{\"kind\": \"List\"}"))
}
// testGroupVersionResources returns a mocked up set of resources across different api groups for testing namespace controller.
func testGroupVersionResources() []unversioned.GroupVersionResource {
results := []unversioned.GroupVersionResource{}
results = append(results, unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "pods"})
results = append(results, unversioned.GroupVersionResource{Group: "", Version: "v1", Resource: "services"})
results = append(results, unversioned.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "deployments"})
return results
}

View File

@ -18,12 +18,17 @@ package namespace
import ( import (
"fmt" "fmt"
"strings"
"time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/v1"
clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset"
unversionedextensions "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" "k8s.io/kubernetes/pkg/client/typed/dynamic"
client "k8s.io/kubernetes/pkg/client/unversioned"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/util/sets" "k8s.io/kubernetes/pkg/util/sets"
"github.com/golang/glog" "github.com/golang/glog"
@ -38,6 +43,29 @@ func (e *contentRemainingError) Error() string {
return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate) return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate)
} }
// operation is used for caching if an operation is supported on a dynamic client.
type operation string
const (
operationDeleteCollection operation = "deleteCollection"
operationList operation = "list"
)
// operationKey is an entry in a cache.
type operationKey struct {
op operation
gvr unversioned.GroupVersionResource
}
// operationNotSupportedCache is a simple cache to remember if an operation is not supported for a resource.
// if the operationKey maps to true, it means the operation is not supported.
type operationNotSupportedCache map[operationKey]bool
// isSupported returns true if the operation is supported
func (o operationNotSupportedCache) isSupported(key operationKey) bool {
return !o[key]
}
// updateNamespaceFunc is a function that makes an update to a namespace // updateNamespaceFunc is a function that makes an update to a namespace
type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error)
@ -58,7 +86,6 @@ func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespa
return nil, err return nil, err
} }
} }
return
} }
// updateNamespaceStatusFunc will verify that the status of the namespace is correct // updateNamespaceStatusFunc will verify that the status of the namespace is correct
@ -78,14 +105,21 @@ func finalized(namespace *api.Namespace) bool {
return len(namespace.Spec.Finalizers) == 0 return len(namespace.Spec.Finalizers) == 0
} }
// finalizeNamespaceFunc removes the kubernetes token and finalizes the namespace // finalizeNamespaceFunc returns a function that knows how to finalize a namespace for specified token.
func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { func finalizeNamespaceFunc(finalizerToken api.FinalizerName) updateNamespaceFunc {
return func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) {
return finalizeNamespace(kubeClient, namespace, finalizerToken)
}
}
// finalizeNamespace removes the specified finalizerToken and finalizes the namespace
func finalizeNamespace(kubeClient clientset.Interface, namespace *api.Namespace, finalizerToken api.FinalizerName) (*api.Namespace, error) {
namespaceFinalize := api.Namespace{} namespaceFinalize := api.Namespace{}
namespaceFinalize.ObjectMeta = namespace.ObjectMeta namespaceFinalize.ObjectMeta = namespace.ObjectMeta
namespaceFinalize.Spec = namespace.Spec namespaceFinalize.Spec = namespace.Spec
finalizerSet := sets.NewString() finalizerSet := sets.NewString()
for i := range namespace.Spec.Finalizers { for i := range namespace.Spec.Finalizers {
if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes { if namespace.Spec.Finalizers[i] != finalizerToken {
finalizerSet.Insert(string(namespace.Spec.Finalizers[i])) finalizerSet.Insert(string(namespace.Spec.Finalizers[i]))
} }
} }
@ -103,99 +137,207 @@ func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namesp
return namespace, err return namespace, err
} }
// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate // deleteCollection is a helper function that will delete the collection of resources
// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources // it returns true if the operation was supported on the server.
// are guaranteed to be gone. // it returns an error if the operation was supported on the server but was unable to complete.
// TODO: this should use discovery to delete arbitrary namespace content func deleteCollection(
func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) { dynamicClient *dynamic.Client,
err = deleteServiceAccounts(kubeClient, namespace) opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
) (bool, error) {
glog.V(4).Infof("namespace controller - deleteCollection - namespace: %s, gvr: %v", namespace, gvr)
key := operationKey{op: operationDeleteCollection, gvr: gvr}
if !opCache.isSupported(key) {
glog.V(4).Infof("namespace controller - deleteCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
return false, nil
}
apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
err := dynamicClient.Resource(&apiResource, namespace).DeleteCollection(nil, v1.ListOptions{})
if err == nil {
return true, nil
}
// this is strange, but we need to special case for both MethodNotSupported and NotFound errors
// TODO: https://github.com/kubernetes/kubernetes/issues/22413
// we have a resource returned in the discovery API that supports no top-level verbs:
// /apis/extensions/v1beta1/namespaces/default/replicationcontrollers
// when working with this resource type, we will get a literal not found error rather than expected method not supported
// remember next time that this resource does not support delete collection...
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
glog.V(4).Infof("namespace controller - deleteCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
opCache[key] = true
return false, nil
}
glog.V(4).Infof("namespace controller - deleteCollection unexpected error - namespace: %s, gvr: %v, error: %v", namespace, gvr, err)
return true, err
}
// listCollection will list the items in the specified namespace
// it returns the following:
// the list of items in the collection (if found)
// a boolean if the operation is supported
// an error if the operation is supported but could not be completed.
func listCollection(
dynamicClient *dynamic.Client,
opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
) (*runtime.UnstructuredList, bool, error) {
glog.V(4).Infof("namespace controller - listCollection - namespace: %s, gvr: %v", namespace, gvr)
key := operationKey{op: operationList, gvr: gvr}
if !opCache.isSupported(key) {
glog.V(4).Infof("namespace controller - listCollection ignored since not supported - namespace: %s, gvr: %v", namespace, gvr)
return nil, false, nil
}
apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
unstructuredList, err := dynamicClient.Resource(&apiResource, namespace).List(v1.ListOptions{})
if err == nil {
return unstructuredList, true, nil
}
// this is strange, but we need to special case for both MethodNotSupported and NotFound errors
// TODO: https://github.com/kubernetes/kubernetes/issues/22413
// we have a resource returned in the discovery API that supports no top-level verbs:
// /apis/extensions/v1beta1/namespaces/default/replicationcontrollers
// when working with this resource type, we will get a literal not found error rather than expected method not supported
// remember next time that this resource does not support delete collection...
if errors.IsMethodNotSupported(err) || errors.IsNotFound(err) {
glog.V(4).Infof("namespace controller - listCollection not supported - namespace: %s, gvr: %v", namespace, gvr)
opCache[key] = true
return nil, false, nil
}
return nil, true, err
}
// deleteEachItem is a helper function that will list the collection of resources and delete each item 1 by 1.
func deleteEachItem(
dynamicClient *dynamic.Client,
opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
) error {
glog.V(4).Infof("namespace controller - deleteEachItem - namespace: %s, gvr: %v", namespace, gvr)
unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
if err != nil {
return err
}
if !listSupported {
return nil
}
apiResource := unversioned.APIResource{Name: gvr.Resource, Namespaced: true}
for _, item := range unstructuredList.Items {
if err = dynamicClient.Resource(&apiResource, namespace).Delete(item.Name, nil); err != nil && !errors.IsNotFound(err) && !errors.IsMethodNotSupported(err) {
return err
}
}
return nil
}
// deleteAllContentForGroupVersionResource will use the dynamic client to delete each resource identified in gvr.
// It returns an estimate of the time remaining before the remaing resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func deleteAllContentForGroupVersionResource(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache operationNotSupportedCache,
gvr unversioned.GroupVersionResource,
namespace string,
namespaceDeletedAt unversioned.Time,
) (int64, error) {
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - namespace: %s, gvr: %v", namespace, gvr)
// estimate how long it will take for the resource to be deleted (needed for objects that support graceful delete)
estimate, err := estimateGracefulTermination(kubeClient, gvr, namespace, namespaceDeletedAt)
if err != nil {
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to estimate - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - estimate - namespace: %s, gvr: %v, estimate: %v", namespace, gvr, estimate)
// get a client for this group version...
dynamicClient, err := clientPool.ClientForGroupVersion(gvr.GroupVersion())
if err != nil {
glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - unable to get client - namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
return estimate, err
}
// first try to delete the entire collection
deleteCollectionSupported, err := deleteCollection(dynamicClient, opCache, gvr, namespace)
if err != nil { if err != nil {
return estimate, err return estimate, err
} }
err = deleteServices(kubeClient, namespace)
if err != nil { // delete collection was not supported, so we list and delete each item...
return estimate, err if !deleteCollectionSupported {
} err = deleteEachItem(dynamicClient, opCache, gvr, namespace)
err = deleteReplicationControllers(kubeClient, namespace)
if err != nil {
return estimate, err
}
estimate, err = deletePods(kubeClient, namespace, before)
if err != nil {
return estimate, err
}
err = deleteSecrets(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteConfigMaps(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deletePersistentVolumeClaims(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteLimitRanges(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteResourceQuotas(kubeClient, namespace)
if err != nil {
return estimate, err
}
err = deleteEvents(kubeClient, namespace)
if err != nil {
return estimate, err
}
// If experimental mode, delete all experimental resources for the namespace.
if containsVersion(versions, "extensions/v1beta1") {
resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1")
if err != nil { if err != nil {
return estimate, err return estimate, err
} }
if containsResource(resources, "horizontalpodautoscalers") { }
err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace)
if err != nil { // verify there are no more remaining items
return estimate, err // it is not an error condition for there to be remaining items if local estimate is non-zero
} glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - checking for no more items in namespace: %s, gvr: %v", namespace, gvr)
} unstructuredList, listSupported, err := listCollection(dynamicClient, opCache, gvr, namespace)
if containsResource(resources, "ingresses") { if err != nil {
err = deleteIngress(kubeClient.Extensions(), namespace) glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - error verifying no items in namespace: %s, gvr: %v, err: %v", namespace, gvr, err)
if err != nil { return estimate, err
return estimate, err }
} if !listSupported {
} return estimate, nil
if containsResource(resources, "daemonsets") { }
err = deleteDaemonSets(kubeClient.Extensions(), namespace) glog.V(4).Infof("namespace controller - deleteAllContentForGroupVersionResource - items remaining - namespace: %s, gvr: %v, items: %v", namespace, gvr, len(unstructuredList.Items))
if err != nil { if len(unstructuredList.Items) != 0 && estimate == int64(0) {
return estimate, err return estimate, fmt.Errorf("unexpected items still remain in namespace: %s for gvr: %v", namespace, gvr)
}
}
if containsResource(resources, "jobs") {
err = deleteJobs(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "deployments") {
err = deleteDeployments(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
if containsResource(resources, "replicasets") {
err = deleteReplicaSets(kubeClient.Extensions(), namespace)
if err != nil {
return estimate, err
}
}
} }
return estimate, nil return estimate, nil
} }
// deleteAllContent will use the dynamic client to delete each resource identified in groupVersionResources.
// It returns an estimate of the time remaining before the remaing resources are deleted.
// If estimate > 0, not all resources are guaranteed to be gone.
func deleteAllContent(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
namespace string,
namespaceDeletedAt unversioned.Time,
) (int64, error) {
estimate := int64(0)
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, gvrs: %v", namespace, groupVersionResources)
// iterate over each group version, and attempt to delete all of its resources
for _, gvr := range groupVersionResources {
gvrEstimate, err := deleteAllContentForGroupVersionResource(kubeClient, clientPool, opCache, gvr, namespace, namespaceDeletedAt)
if err != nil {
return estimate, err
}
if gvrEstimate > estimate {
estimate = gvrEstimate
}
}
glog.V(4).Infof("namespace controller - deleteAllContent - namespace: %s, estimate: %v", namespace, estimate)
return estimate, nil
}
// syncNamespace orchestrates deletion of a Namespace and its associated content. // syncNamespace orchestrates deletion of a Namespace and its associated content.
func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error { func syncNamespace(
kubeClient clientset.Interface,
clientPool dynamic.ClientPool,
opCache operationNotSupportedCache,
groupVersionResources []unversioned.GroupVersionResource,
namespace *api.Namespace,
finalizerToken api.FinalizerName,
) error {
if namespace.DeletionTimestamp == nil { if namespace.DeletionTimestamp == nil {
return nil return nil
} }
@ -211,7 +353,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
return err return err
} }
glog.V(4).Infof("Syncing namespace %s", namespace.Name) glog.V(4).Infof("namespace controller - syncNamespace - namespace: %s, finalizerToken: %s", namespace.Name, finalizerToken)
// ensure that the status is up to date on the namespace // ensure that the status is up to date on the namespace
// if we get a not found error, we assume the namespace is truly gone // if we get a not found error, we assume the namespace is truly gone
@ -233,7 +375,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
} }
// there may still be content for us to remove // there may still be content for us to remove
estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp) estimate, err := deleteAllContent(kubeClient, clientPool, opCache, groupVersionResources, namespace.Name, *namespace.DeletionTimestamp)
if err != nil { if err != nil {
return err return err
} }
@ -242,7 +384,7 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
} }
// we have removed content, so mark it finalized by us // we have removed content, so mark it finalized by us
result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc) result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc(finalizerToken))
if err != nil { if err != nil {
// in normal practice, this should not be possible, but if a deployment is running // in normal practice, this should not be possible, but if a deployment is running
// two controllers to do namespace deletion that share a common finalizer token it's // two controllers to do namespace deletion that share a common finalizer token it's
@ -264,125 +406,75 @@ func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVers
return nil return nil
} }
func deleteLimitRanges(kubeClient clientset.Interface, ns string) error { // estimateGrracefulTermination will estimate the graceful termination required for the specific entity in the namespace
return kubeClient.Core().LimitRanges(ns).DeleteCollection(nil, api.ListOptions{}) func estimateGracefulTermination(kubeClient clientset.Interface, groupVersionResource unversioned.GroupVersionResource, ns string, namespaceDeletedAt unversioned.Time) (int64, error) {
} groupResource := groupVersionResource.GroupResource()
glog.V(4).Infof("namespace controller - estimateGracefulTermination - group %s, resource: %s", groupResource.Group, groupResource.Resource)
func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error { estimate := int64(0)
return kubeClient.Core().ResourceQuotas(ns).DeleteCollection(nil, api.ListOptions{}) var err error
} switch groupResource {
case unversioned.GroupResource{Group: "", Resource: "pods"}:
func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error { estimate, err = estimateGracefulTerminationForPods(kubeClient, ns)
return kubeClient.Core().ServiceAccounts(ns).DeleteCollection(nil, api.ListOptions{}) }
}
func deleteServices(kubeClient clientset.Interface, ns string) error {
items, err := kubeClient.Core().Services(ns).List(api.ListOptions{})
if err != nil { if err != nil {
return err return estimate, err
} }
for i := range items.Items { // determine if the estimate is greater than the deletion timestamp
err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil) duration := time.Since(namespaceDeletedAt.Time)
if err != nil && !errors.IsNotFound(err) { allowedEstimate := time.Duration(estimate) * time.Second
return err if duration >= allowedEstimate {
} estimate = int64(0)
} }
return nil return estimate, nil
} }
func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error { // estimateGracefulTerminationForPods determines the graceful termination period for pods in the namespace
return kubeClient.Core().ReplicationControllers(ns).DeleteCollection(nil, api.ListOptions{}) func estimateGracefulTerminationForPods(kubeClient clientset.Interface, ns string) (int64, error) {
} glog.V(4).Infof("namespace controller - estimateGracefulTerminationForPods - namespace %s", ns)
estimate := int64(0)
func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) {
items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{}) items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{})
if err != nil { if err != nil {
return 0, err return estimate, err
} }
expired := unversioned.Now().After(before.Time)
var deleteOptions *api.DeleteOptions
if expired {
deleteOptions = api.NewDeleteOptions(0)
}
estimate := int64(0)
for i := range items.Items { for i := range items.Items {
// filter out terminal pods
phase := items.Items[i].Status.Phase
if api.PodSucceeded == phase || api.PodFailed == phase {
continue
}
if items.Items[i].Spec.TerminationGracePeriodSeconds != nil { if items.Items[i].Spec.TerminationGracePeriodSeconds != nil {
grace := *items.Items[i].Spec.TerminationGracePeriodSeconds grace := *items.Items[i].Spec.TerminationGracePeriodSeconds
if grace > estimate { if grace > estimate {
estimate = grace estimate = grace
} }
} }
err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions)
if err != nil && !errors.IsNotFound(err) {
return 0, err
}
}
if expired {
estimate = 0
} }
return estimate, nil return estimate, nil
} }
func deleteEvents(kubeClient clientset.Interface, ns string) error { // ServerPreferredNamespacedGroupVersionResources uses the specified client to discover the set of preferred groupVersionResources that are namespaced
return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{}) func ServerPreferredNamespacedGroupVersionResources(discoveryClient client.DiscoveryInterface) ([]unversioned.GroupVersionResource, error) {
} results := []unversioned.GroupVersionResource{}
serverGroupList, err := discoveryClient.ServerGroups()
func deleteSecrets(kubeClient clientset.Interface, ns string) error { if err != nil {
return kubeClient.Core().Secrets(ns).DeleteCollection(nil, api.ListOptions{}) return results, err
} }
for _, apiGroup := range serverGroupList.Groups {
func deleteConfigMaps(kubeClient clientset.Interface, ns string) error { preferredVersion := apiGroup.PreferredVersion
return kubeClient.Core().ConfigMaps(ns).DeleteCollection(nil, api.ListOptions{}) apiResourceList, err := discoveryClient.ServerResourcesForGroupVersion(preferredVersion.GroupVersion)
} if err != nil {
return results, err
func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error { }
return kubeClient.Core().PersistentVolumeClaims(ns).DeleteCollection(nil, api.ListOptions{}) groupVersion := unversioned.GroupVersion{Group: apiGroup.Name, Version: preferredVersion.Version}
} for _, apiResource := range apiResourceList.APIResources {
if !apiResource.Namespaced {
func deleteHorizontalPodAutoscalers(expClient unversionedextensions.ExtensionsInterface, ns string) error { continue
return expClient.HorizontalPodAutoscalers(ns).DeleteCollection(nil, api.ListOptions{}) }
} if strings.Contains(apiResource.Name, "/") {
continue
func deleteDaemonSets(expClient unversionedextensions.ExtensionsInterface, ns string) error { }
return expClient.DaemonSets(ns).DeleteCollection(nil, api.ListOptions{}) results = append(results, groupVersion.WithResource(apiResource.Name))
}
func deleteJobs(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.Jobs(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteDeployments(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.Deployments(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteReplicaSets(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.ReplicaSets(ns).DeleteCollection(nil, api.ListOptions{})
}
func deleteIngress(expClient unversionedextensions.ExtensionsInterface, ns string) error {
return expClient.Ingresses(ns).DeleteCollection(nil, api.ListOptions{})
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsVersion(versions *unversioned.APIVersions, version string) bool {
for ix := range versions.Versions {
if versions.Versions[ix] == version {
return true
} }
} }
return false return results, nil
}
// TODO: this is duplicated logic. Move it somewhere central?
func containsResource(resources *unversioned.APIResourceList, resourceName string) bool {
if resources == nil {
return false
}
for ix := range resources.APIResources {
resource := resources.APIResources[ix]
if resource.Name == resourceName {
return true
}
}
return false
} }

View File

@ -23,6 +23,8 @@ import (
"time" "time"
"k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/errors"
"k8s.io/kubernetes/pkg/util/intstr"
"k8s.io/kubernetes/pkg/util/wait" "k8s.io/kubernetes/pkg/util/wait"
. "github.com/onsi/ginkgo" . "github.com/onsi/ginkgo"
@ -74,6 +76,109 @@ func extinguish(f *Framework, totalNS int, maxAllowedAfterDel int, maxSeconds in
})) }))
} }
func ensurePodsAreRemovedWhenNamespaceIsDeleted(f *Framework) {
var err error
By("Creating a test namespace")
namespace, err := f.CreateNamespace("nsdeletetest", nil)
Expect(err).NotTo(HaveOccurred())
By("Waiting for a default service account to be provisioned in namespace")
err = waitForDefaultServiceAccountInNamespace(f.Client, namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Creating a pod in the namespace")
pod := &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test-pod",
},
Spec: api.PodSpec{
Containers: []api.Container{
{
Name: "nginx",
Image: "gcr.io/google_containers/pause:2.0",
},
},
},
}
pod, err = f.Client.Pods(namespace.Name).Create(pod)
Expect(err).NotTo(HaveOccurred())
By("Waiting for the pod to have running status")
expectNoError(waitForPodRunningInNamespace(f.Client, pod.Name, pod.Namespace))
By("Deleting the namespace")
err = f.Client.Namespaces().Delete(namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Waiting for the namespace to be removed.")
maxWaitSeconds := int64(60) + *pod.Spec.TerminationGracePeriodSeconds
expectNoError(wait.Poll(1*time.Second, time.Duration(maxWaitSeconds)*time.Second,
func() (bool, error) {
_, err = f.Client.Namespaces().Get(namespace.Name)
if err != nil && errors.IsNotFound(err) {
return true, nil
}
return false, nil
}))
By("Verifying there is no pod in the namespace")
_, err = f.Client.Pods(namespace.Name).Get(pod.Name)
Expect(err).To(HaveOccurred())
}
func ensureServicesAreRemovedWhenNamespaceIsDeleted(f *Framework) {
var err error
By("Creating a test namespace")
namespace, err := f.CreateNamespace("nsdeletetest", nil)
Expect(err).NotTo(HaveOccurred())
By("Waiting for a default service account to be provisioned in namespace")
err = waitForDefaultServiceAccountInNamespace(f.Client, namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Creating a service in the namespace")
serviceName := "test-service"
labels := map[string]string{
"foo": "bar",
"baz": "blah",
}
service := &api.Service{
ObjectMeta: api.ObjectMeta{
Name: serviceName,
},
Spec: api.ServiceSpec{
Selector: labels,
Ports: []api.ServicePort{{
Port: 80,
TargetPort: intstr.FromInt(80),
}},
},
}
service, err = f.Client.Services(namespace.Name).Create(service)
Expect(err).NotTo(HaveOccurred())
By("Deleting the namespace")
err = f.Client.Namespaces().Delete(namespace.Name)
Expect(err).NotTo(HaveOccurred())
By("Waiting for the namespace to be removed.")
maxWaitSeconds := int64(60)
expectNoError(wait.Poll(1*time.Second, time.Duration(maxWaitSeconds)*time.Second,
func() (bool, error) {
_, err = f.Client.Namespaces().Get(namespace.Name)
if err != nil && errors.IsNotFound(err) {
return true, nil
}
return false, nil
}))
By("Verifying there is no service in the namespace")
_, err = f.Client.Services(namespace.Name).Get(service.Name)
Expect(err).To(HaveOccurred())
}
// This test must run [Serial] due to the impact of running other parallel // This test must run [Serial] due to the impact of running other parallel
// tests can have on its performance. Each test that follows the common // tests can have on its performance. Each test that follows the common
// test framework follows this pattern: // test framework follows this pattern:
@ -106,10 +211,17 @@ var _ = Describe("Namespaces [Serial]", func() {
f := NewDefaultFramework("namespaces") f := NewDefaultFramework("namespaces")
It("should ensure that all pods are removed when a namespace is deleted.",
func() { ensurePodsAreRemovedWhenNamespaceIsDeleted(f) })
It("should ensure that all services are removed when a namespace is deleted.",
func() { ensureServicesAreRemovedWhenNamespaceIsDeleted(f) })
It("should delete fast enough (90 percent of 100 namespaces in 150 seconds)", It("should delete fast enough (90 percent of 100 namespaces in 150 seconds)",
func() { extinguish(f, 100, 10, 150) }) func() { extinguish(f, 100, 10, 150) })
// On hold until etcd3; see #7372 // On hold until etcd3; see #7372
It("should always delete fast (ALL of 100 namespaces in 150 seconds) [Feature:ComprehensiveNamespaceDraining]", It("should always delete fast (ALL of 100 namespaces in 150 seconds) [Feature:ComprehensiveNamespaceDraining]",
func() { extinguish(f, 100, 0, 150) }) func() { extinguish(f, 100, 0, 150) })
}) })