diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 16afe173aae..eec53f02573 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -241,7 +241,8 @@ func StartControllers(s *options.CMServer, kubeClient *client.Client, kubeconfig glog.Fatalf("Failed to get supported resources from server: %v", err) } - namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "namespace-controller")), versions, s.NamespaceSyncPeriod).Run() + namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "namespace-controller")), &unversioned.APIVersions{}, s.NamespaceSyncPeriod) + go namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) groupVersion := "extensions/v1beta1" resources, found := resourceMap[groupVersion] diff --git a/cmd/kube-controller-manager/app/options/options.go b/cmd/kube-controller-manager/app/options/options.go index 3fad72cb7b6..e6846ca99a3 100644 --- a/cmd/kube-controller-manager/app/options/options.go +++ b/cmd/kube-controller-manager/app/options/options.go @@ -44,6 +44,7 @@ type CMServer struct { ConcurrentJobSyncs int ConcurrentResourceQuotaSyncs int ConcurrentDeploymentSyncs int + ConcurrentNamespaceSyncs int ServiceSyncPeriod time.Duration NodeSyncPeriod time.Duration ResourceQuotaSyncPeriod time.Duration @@ -105,6 +106,7 @@ func NewCMServer() *CMServer { ConcurrentJobSyncs: 5, ConcurrentResourceQuotaSyncs: 5, ConcurrentDeploymentSyncs: 5, + ConcurrentNamespaceSyncs: 2, ServiceSyncPeriod: 5 * time.Minute, NodeSyncPeriod: 10 * time.Second, ResourceQuotaSyncPeriod: 5 * time.Minute, @@ -143,10 +145,11 @@ func (s *CMServer) AddFlags(fs *pflag.FlagSet) { fs.StringVar(&s.CloudProvider, "cloud-provider", s.CloudProvider, "The provider for cloud services. Empty string for no provider.") fs.StringVar(&s.CloudConfigFile, "cloud-config", s.CloudConfigFile, "The path to the cloud provider configuration file. Empty string for no configuration file.") fs.IntVar(&s.ConcurrentEndpointSyncs, "concurrent-endpoint-syncs", s.ConcurrentEndpointSyncs, "The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load") - fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") - fs.IntVar(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentRCSyncs, "concurrent_rc_syncs", s.ConcurrentRCSyncs, "The number of replication controllers that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentRSSyncs, "concurrent-replicaset-syncs", s.ConcurrentRSSyncs, "The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load") fs.IntVar(&s.ConcurrentResourceQuotaSyncs, "concurrent-resource-quota-syncs", s.ConcurrentResourceQuotaSyncs, "The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load") - fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentDeploymentSyncs, "concurrent-deployment-syncs", s.ConcurrentDeploymentSyncs, "The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load") + fs.IntVar(&s.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", s.ConcurrentNamespaceSyncs, "The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load") fs.DurationVar(&s.ServiceSyncPeriod, "service-sync-period", s.ServiceSyncPeriod, "The period for syncing services with their external load balancers") fs.DurationVar(&s.NodeSyncPeriod, "node-sync-period", s.NodeSyncPeriod, ""+ "The period for syncing nodes from cloudprovider. Longer periods will result in "+ diff --git a/contrib/mesos/pkg/controllermanager/controllermanager.go b/contrib/mesos/pkg/controllermanager/controllermanager.go index ea975830322..06a883f34b1 100644 --- a/contrib/mesos/pkg/controllermanager/controllermanager.go +++ b/contrib/mesos/pkg/controllermanager/controllermanager.go @@ -197,7 +197,7 @@ func (s *CMServer) Run(_ []string) error { } namespaceController := namespacecontroller.NewNamespaceController(clientset.NewForConfigOrDie(client.AddUserAgent(kubeconfig, "namespace-controller")), &unversioned.APIVersions{}, s.NamespaceSyncPeriod) - namespaceController.Run() + namespaceController.Run(s.ConcurrentNamespaceSyncs, wait.NeverStop) groupVersion := "extensions/v1beta1" resources, found := resourceMap[groupVersion] diff --git a/docs/admin/kube-controller-manager.md b/docs/admin/kube-controller-manager.md index 3f92655dcc2..3b346964635 100644 --- a/docs/admin/kube-controller-manager.md +++ b/docs/admin/kube-controller-manager.md @@ -61,11 +61,12 @@ kube-controller-manager --cloud-provider="": The provider for cloud services. Empty string for no provider. --cluster-cidr=: CIDR Range for Pods in cluster. --cluster-name="kubernetes": The instance prefix for the cluster - --concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more reponsive deployments, but more CPU (and network) load + --concurrent-deployment-syncs=5: The number of deployment objects that are allowed to sync concurrently. Larger number = more responsive deployments, but more CPU (and network) load --concurrent-endpoint-syncs=5: The number of endpoint syncing operations that will be done concurrently. Larger number = faster endpoint updating, but more CPU (and network) load - --concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load + --concurrent-namespace-syncs=2: The number of namespace objects that are allowed to sync concurrently. Larger number = more responsive namespace termination, but more CPU (and network) load + --concurrent-replicaset-syncs=5: The number of replica sets that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load --concurrent-resource-quota-syncs=5: The number of resource quotas that are allowed to sync concurrently. Larger number = more responsive quota management, but more CPU (and network) load - --concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more reponsive replica management, but more CPU (and network) load + --concurrent_rc_syncs=5: The number of replication controllers that are allowed to sync concurrently. Larger number = more responsive replica management, but more CPU (and network) load --deleting-pods-burst=10: Number of nodes on which pods are bursty deleted in case of node failure. For more details look into RateLimiter. --deleting-pods-qps=0.1: Number of nodes per second on which pods are deleted in case of node failure. --deployment-controller-sync-period=30s: Period for syncing the deployments. @@ -104,7 +105,7 @@ kube-controller-manager --terminated-pod-gc-threshold=12500: Number of terminated pods that can exist before the terminated pod garbage collector starts deleting terminated pods. If <= 0, the terminated pod garbage collector is disabled. ``` -###### Auto generated by spf13/cobra on 5-Feb-2016 +###### Auto generated by spf13/cobra on 8-Feb-2016 diff --git a/hack/verify-flags/known-flags.txt b/hack/verify-flags/known-flags.txt index 558716b38c7..50bcf67b6b8 100644 --- a/hack/verify-flags/known-flags.txt +++ b/hack/verify-flags/known-flags.txt @@ -49,6 +49,7 @@ concurrent-deployment-syncs concurrent-endpoint-syncs concurrent-replicaset-syncs concurrent-resource-quota-syncs +concurrent-namespace-syncs config-sync-period configure-cbr0 conntrack-max diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index f3db7709619..1d35fc9ffa1 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -17,19 +17,18 @@ limitations under the License. package namespace import ( - "fmt" "time" "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/errors" "k8s.io/kubernetes/pkg/api/unversioned" "k8s.io/kubernetes/pkg/client/cache" clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" - extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller/framework" "k8s.io/kubernetes/pkg/runtime" utilruntime "k8s.io/kubernetes/pkg/util/runtime" - "k8s.io/kubernetes/pkg/util/sets" + "k8s.io/kubernetes/pkg/util/wait" + "k8s.io/kubernetes/pkg/util/workqueue" "k8s.io/kubernetes/pkg/watch" "github.com/golang/glog" @@ -37,14 +36,29 @@ import ( // NamespaceController is responsible for performing actions dependent upon a namespace phase type NamespaceController struct { - controller *framework.Controller - StopEverything chan struct{} + // client that purges namespace content, must have list/delete privileges on all content + kubeClient clientset.Interface + // store that holds the namespaces + store cache.Store + // controller that observes the namespaces + controller *framework.Controller + // namespaces that have been queued up for processing by workers + queue *workqueue.Type + // list of versions to process + versions *unversioned.APIVersions } // NewNamespaceController creates a new NamespaceController func NewNamespaceController(kubeClient clientset.Interface, versions *unversioned.APIVersions, resyncPeriod time.Duration) *NamespaceController { - var controller *framework.Controller - _, controller = framework.NewInformer( + // create the controller so we can inject the enqueue function + namespaceController := &NamespaceController{ + kubeClient: kubeClient, + versions: versions, + queue: workqueue.New(), + } + + // configure the backing store/controller + store, controller := framework.NewInformer( &cache.ListWatch{ ListFunc: func(options api.ListOptions) (runtime.Object, error) { return kubeClient.Core().Namespaces().List(options) @@ -54,530 +68,88 @@ func NewNamespaceController(kubeClient clientset.Interface, versions *unversione }, }, &api.Namespace{}, - // TODO: Can we have much longer period here? resyncPeriod, framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { namespace := obj.(*api.Namespace) - if err := syncNamespace(kubeClient, versions, namespace); err != nil { - if estimate, ok := err.(*contentRemainingError); ok { - go func() { - // Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s - // for pods. However, most processes will terminate faster - within a few seconds, probably - // with a peak within 5-10s. So this division is a heuristic that avoids waiting the full - // duration when in many cases things complete more quickly. The extra second added is to - // ensure we never wait 0 seconds. - t := estimate.Estimate/2 + 1 - glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) - time.Sleep(time.Duration(t) * time.Second) - if err := controller.Requeue(namespace); err != nil { - utilruntime.HandleError(err) - } - }() - return - } - utilruntime.HandleError(err) - } + namespaceController.enqueueNamespace(namespace) }, UpdateFunc: func(oldObj, newObj interface{}) { namespace := newObj.(*api.Namespace) - if err := syncNamespace(kubeClient, versions, namespace); err != nil { - if estimate, ok := err.(*contentRemainingError); ok { - go func() { - t := estimate.Estimate/2 + 1 - glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) - time.Sleep(time.Duration(t) * time.Second) - if err := controller.Requeue(namespace); err != nil { - utilruntime.HandleError(err) - } - }() - return - } - utilruntime.HandleError(err) - } + namespaceController.enqueueNamespace(namespace) }, }, ) - return &NamespaceController{ - controller: controller, - } + namespaceController.store = store + namespaceController.controller = controller + return namespaceController } -// Run begins observing the system. It starts a goroutine and returns immediately. -func (nm *NamespaceController) Run() { - if nm.StopEverything == nil { - nm.StopEverything = make(chan struct{}) - go nm.controller.Run(nm.StopEverything) +// enqueueNamespace adds an object to the controller work queue +// obj could be an *api.Namespace, or a DeletionFinalStateUnknown item. +func (nm *NamespaceController) enqueueNamespace(obj interface{}) { + key, err := controller.KeyFunc(obj) + if err != nil { + glog.Errorf("Couldn't get key for object %+v: %v", obj, err) + return } + nm.queue.Add(key) } -// Stop gracefully shutsdown this controller -func (nm *NamespaceController) Stop() { - if nm.StopEverything != nil { - close(nm.StopEverything) - nm.StopEverything = nil - } -} - -// finalized returns true if the spec.finalizers is empty list -func finalized(namespace *api.Namespace) bool { - return len(namespace.Spec.Finalizers) == 0 -} - -// finalize will finalize the namespace for kubernetes -func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { - namespaceFinalize := api.Namespace{} - namespaceFinalize.ObjectMeta = namespace.ObjectMeta - namespaceFinalize.Spec = namespace.Spec - finalizerSet := sets.NewString() - for i := range namespace.Spec.Finalizers { - if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes { - finalizerSet.Insert(string(namespace.Spec.Finalizers[i])) - } - } - namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, 0, len(finalizerSet)) - for _, value := range finalizerSet.List() { - namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value)) - } - namespace, err := kubeClient.Core().Namespaces().Finalize(&namespaceFinalize) - if err != nil { - // it was removed already, so life is good - if errors.IsNotFound(err) { - return namespace, nil - } - } - return namespace, err -} - -type contentRemainingError struct { - Estimate int64 -} - -func (e *contentRemainingError) Error() string { - return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate) -} - -// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate -// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources -// are guaranteed to be gone. -func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) { - err = deleteServiceAccounts(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteServices(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteReplicationControllers(kubeClient, namespace) - if err != nil { - return estimate, err - } - estimate, err = deletePods(kubeClient, namespace, before) - if err != nil { - return estimate, err - } - err = deleteSecrets(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deletePersistentVolumeClaims(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteLimitRanges(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteResourceQuotas(kubeClient, namespace) - if err != nil { - return estimate, err - } - err = deleteEvents(kubeClient, namespace) - if err != nil { - return estimate, err - } - // If experimental mode, delete all experimental resources for the namespace. - if containsVersion(versions, "extensions/v1beta1") { - resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1") - if err != nil { - return estimate, err - } - if containsResource(resources, "horizontalpodautoscalers") { - err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "ingresses") { - err = deleteIngress(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "daemonsets") { - err = deleteDaemonSets(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "jobs") { - err = deleteJobs(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "deployments") { - err = deleteDeployments(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - if containsResource(resources, "replicasets") { - err = deleteReplicaSets(kubeClient.Extensions(), namespace) - if err != nil { - return estimate, err - } - } - } - return estimate, nil -} - -// updateNamespaceFunc is a function that makes an update to a namespace -type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) - -// retryOnConflictError retries the specified fn if there was a conflict error -// TODO RetryOnConflict should be a generic concept in client code -func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespace, fn updateNamespaceFunc) (result *api.Namespace, err error) { - latestNamespace := namespace +// worker processes the queue of namespace objects. +// Each namespace can be in the queue at most once. +// The system ensures that no two workers can process +// the same namespace at the same time. +func (nm *NamespaceController) worker() { for { - result, err = fn(kubeClient, latestNamespace) - if err == nil { - return result, nil - } - if !errors.IsConflict(err) { - return nil, err - } - latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name) - if err != nil { - return nil, err - } - } - return -} - -// updateNamespaceStatusFunc will verify that the status of the namespace is correct -func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { - if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == api.NamespaceTerminating { - return namespace, nil - } - newNamespace := api.Namespace{} - newNamespace.ObjectMeta = namespace.ObjectMeta - newNamespace.Status = namespace.Status - newNamespace.Status.Phase = api.NamespaceTerminating - return kubeClient.Core().Namespaces().UpdateStatus(&newNamespace) -} - -// syncNamespace orchestrates deletion of a Namespace and its associated content. -func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error { - if namespace.DeletionTimestamp == nil { - return nil - } - - // multiple controllers may edit a namespace during termination - // first get the latest state of the namespace before proceeding - // if the namespace was deleted already, don't do anything - namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name) - if err != nil { - if errors.IsNotFound(err) { - return nil - } - return err - } - - glog.V(4).Infof("Syncing namespace %s", namespace.Name) - - // ensure that the status is up to date on the namespace - // if we get a not found error, we assume the namespace is truly gone - namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc) - if err != nil { - if errors.IsNotFound(err) { - return nil - } - return err - } - - // if the namespace is already finalized, delete it - if finalized(namespace) { - err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - return nil - } - - // there may still be content for us to remove - estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp) - if err != nil { - return err - } - if estimate > 0 { - return &contentRemainingError{estimate} - } - - // we have removed content, so mark it finalized by us - result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc) - if err != nil { - return err - } - - // now check if all finalizers have reported that we delete now - if finalized(result) { - err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - - return nil -} - -func deleteLimitRanges(kubeClient clientset.Interface, ns string) error { - items, err := kubeClient.Core().LimitRanges(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := kubeClient.Core().LimitRanges(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error { - resourceQuotas, err := kubeClient.Core().ResourceQuotas(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range resourceQuotas.Items { - err := kubeClient.Core().ResourceQuotas(ns).Delete(resourceQuotas.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error { - items, err := kubeClient.Core().ServiceAccounts(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := kubeClient.Core().ServiceAccounts(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteServices(kubeClient clientset.Interface, ns string) error { - items, err := kubeClient.Core().Services(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error { - items, err := kubeClient.Core().ReplicationControllers(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := kubeClient.Core().ReplicationControllers(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) { - items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{}) - if err != nil { - return 0, err - } - expired := unversioned.Now().After(before.Time) - var deleteOptions *api.DeleteOptions - if expired { - deleteOptions = api.NewDeleteOptions(0) - } - estimate := int64(0) - for i := range items.Items { - if items.Items[i].Spec.TerminationGracePeriodSeconds != nil { - grace := *items.Items[i].Spec.TerminationGracePeriodSeconds - if grace > estimate { - estimate = grace + func() { + key, quit := nm.queue.Get() + if quit { + return } - } - err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions) - if err != nil && !errors.IsNotFound(err) { - return 0, err - } + defer nm.queue.Done(key) + if err := nm.syncNamespaceFromKey(key.(string)); err != nil { + if estimate, ok := err.(*contentRemainingError); ok { + go func() { + t := estimate.Estimate/2 + 1 + glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", key, t) + time.Sleep(time.Duration(t) * time.Second) + nm.queue.Add(key) + }() + } + } + }() } - if expired { - estimate = 0 - } - return estimate, nil } -func deleteEvents(kubeClient clientset.Interface, ns string) error { - return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{}) -} +// syncNamespaceFromKey looks for a namespace with the specified key in its store and synchronizes it +func (nm *NamespaceController) syncNamespaceFromKey(key string) (err error) { + startTime := time.Now() + defer glog.V(4).Infof("Finished syncing namespace %q (%v)", key, time.Now().Sub(startTime)) -func deleteSecrets(kubeClient clientset.Interface, ns string) error { - items, err := kubeClient.Core().Secrets(ns).List(api.ListOptions{}) + obj, exists, err := nm.store.GetByKey(key) + if !exists { + glog.Infof("Namespace has been deleted %v", key) + return nil + } if err != nil { + glog.Infof("Unable to retrieve namespace %v from store: %v", key, err) + nm.queue.Add(key) return err } - for i := range items.Items { - err := kubeClient.Core().Secrets(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil + namespace := obj.(*api.Namespace) + return syncNamespace(nm.kubeClient, nm.versions, namespace) } -func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error { - items, err := kubeClient.Core().PersistentVolumeClaims(ns).List(api.ListOptions{}) - if err != nil { - return err +// Run starts observing the system with the specified number of workers. +func (nm *NamespaceController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + go nm.controller.Run(stopCh) + for i := 0; i < workers; i++ { + go wait.Until(nm.worker, time.Second, stopCh) } - for i := range items.Items { - err := kubeClient.Core().PersistentVolumeClaims(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteHorizontalPodAutoscalers(expClient extensions_unversioned.ExtensionsInterface, ns string) error { - items, err := expClient.HorizontalPodAutoscalers(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := expClient.HorizontalPodAutoscalers(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteDaemonSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error { - items, err := expClient.DaemonSets(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := expClient.DaemonSets(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteJobs(expClient extensions_unversioned.ExtensionsInterface, ns string) error { - items, err := expClient.Jobs(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := expClient.Jobs(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteDeployments(expClient extensions_unversioned.ExtensionsInterface, ns string) error { - items, err := expClient.Deployments(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := expClient.Deployments(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns string) error { - items, err := expClient.Ingresses(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := expClient.Ingresses(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -func deleteReplicaSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error { - items, err := expClient.ReplicaSets(ns).List(api.ListOptions{}) - if err != nil { - return err - } - for i := range items.Items { - err := expClient.ReplicaSets(ns).Delete(items.Items[i].Name, nil) - if err != nil && !errors.IsNotFound(err) { - return err - } - } - return nil -} - -// TODO: this is duplicated logic. Move it somewhere central? -func containsVersion(versions *unversioned.APIVersions, version string) bool { - for ix := range versions.Versions { - if versions.Versions[ix] == version { - return true - } - } - return false -} - -// TODO: this is duplicated logic. Move it somewhere central? -func containsResource(resources *unversioned.APIResourceList, resourceName string) bool { - if resources == nil { - return false - } - for ix := range resources.APIResources { - resource := resources.APIResources[ix] - if resource.Name == resourceName { - return true - } - } - return false + <-stopCh + glog.Infof("Shutting down NamespaceController") + nm.queue.ShutDown() } diff --git a/pkg/controller/namespace/namespace_controller_test.go b/pkg/controller/namespace/namespace_controller_test.go index 397dab70f8e..d745d4fdc95 100644 --- a/pkg/controller/namespace/namespace_controller_test.go +++ b/pkg/controller/namespace/namespace_controller_test.go @@ -20,7 +20,6 @@ import ( "fmt" "strings" "testing" - "time" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -104,25 +103,25 @@ func testSyncNamespaceThatIsTerminating(t *testing.T, versions *unversioned.APIV // TODO: Reuse the constants for all these strings from testclient pendingActionSet := sets.NewString( strings.Join([]string{"get", "namespaces", ""}, "-"), - strings.Join([]string{"list", "replicationcontrollers", ""}, "-"), + strings.Join([]string{"delete-collection", "replicationcontrollers", ""}, "-"), strings.Join([]string{"list", "services", ""}, "-"), strings.Join([]string{"list", "pods", ""}, "-"), - strings.Join([]string{"list", "resourcequotas", ""}, "-"), - strings.Join([]string{"list", "secrets", ""}, "-"), - strings.Join([]string{"list", "limitranges", ""}, "-"), + strings.Join([]string{"delete-collection", "resourcequotas", ""}, "-"), + strings.Join([]string{"delete-collection", "secrets", ""}, "-"), + strings.Join([]string{"delete-collection", "limitranges", ""}, "-"), strings.Join([]string{"delete-collection", "events", ""}, "-"), - strings.Join([]string{"list", "serviceaccounts", ""}, "-"), - strings.Join([]string{"list", "persistentvolumeclaims", ""}, "-"), + strings.Join([]string{"delete-collection", "serviceaccounts", ""}, "-"), + strings.Join([]string{"delete-collection", "persistentvolumeclaims", ""}, "-"), strings.Join([]string{"create", "namespaces", "finalize"}, "-"), ) if containsVersion(versions, "extensions/v1beta1") { pendingActionSet.Insert( - strings.Join([]string{"list", "daemonsets", ""}, "-"), - strings.Join([]string{"list", "deployments", ""}, "-"), - strings.Join([]string{"list", "jobs", ""}, "-"), - strings.Join([]string{"list", "horizontalpodautoscalers", ""}, "-"), - strings.Join([]string{"list", "ingresses", ""}, "-"), + strings.Join([]string{"delete-collection", "daemonsets", ""}, "-"), + strings.Join([]string{"delete-collection", "deployments", ""}, "-"), + strings.Join([]string{"delete-collection", "jobs", ""}, "-"), + strings.Join([]string{"delete-collection", "horizontalpodautoscalers", ""}, "-"), + strings.Join([]string{"delete-collection", "ingresses", ""}, "-"), strings.Join([]string{"get", "resource", ""}, "-"), ) } @@ -225,25 +224,3 @@ func TestSyncNamespaceThatIsActive(t *testing.T) { t.Errorf("Expected no action from controller, but got: %v", mockClient.Actions()) } } - -func TestRunStop(t *testing.T) { - mockClient := &fake.Clientset{} - - nsController := NewNamespaceController(mockClient, &unversioned.APIVersions{}, 1*time.Second) - - if nsController.StopEverything != nil { - t.Errorf("Non-running manager should not have a stop channel. Got %v", nsController.StopEverything) - } - - nsController.Run() - - if nsController.StopEverything == nil { - t.Errorf("Running manager should have a stop channel. Got nil") - } - - nsController.Stop() - - if nsController.StopEverything != nil { - t.Errorf("Non-running manager should not have a stop channel. Got %v", nsController.StopEverything) - } -} diff --git a/pkg/controller/namespace/namespace_controller_utils.go b/pkg/controller/namespace/namespace_controller_utils.go new file mode 100644 index 00000000000..f4aa3317556 --- /dev/null +++ b/pkg/controller/namespace/namespace_controller_utils.go @@ -0,0 +1,364 @@ +/* +Copyright 2015 The Kubernetes Authors All rights reserved. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package namespace + +import ( + "fmt" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/errors" + "k8s.io/kubernetes/pkg/api/unversioned" + clientset "k8s.io/kubernetes/pkg/client/clientset_generated/internalclientset" + extensions_unversioned "k8s.io/kubernetes/pkg/client/typed/generated/extensions/unversioned" + "k8s.io/kubernetes/pkg/util/sets" + + "github.com/golang/glog" +) + +// contentRemainingError is used to inform the caller that content is not fully removed from the namespace +type contentRemainingError struct { + Estimate int64 +} + +func (e *contentRemainingError) Error() string { + return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate) +} + +// updateNamespaceFunc is a function that makes an update to a namespace +type updateNamespaceFunc func(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) + +// retryOnConflictError retries the specified fn if there was a conflict error +// TODO RetryOnConflict should be a generic concept in client code +func retryOnConflictError(kubeClient clientset.Interface, namespace *api.Namespace, fn updateNamespaceFunc) (result *api.Namespace, err error) { + latestNamespace := namespace + for { + result, err = fn(kubeClient, latestNamespace) + if err == nil { + return result, nil + } + if !errors.IsConflict(err) { + return nil, err + } + latestNamespace, err = kubeClient.Core().Namespaces().Get(latestNamespace.Name) + if err != nil { + return nil, err + } + } + return +} + +// updateNamespaceStatusFunc will verify that the status of the namespace is correct +func updateNamespaceStatusFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { + if namespace.DeletionTimestamp.IsZero() || namespace.Status.Phase == api.NamespaceTerminating { + return namespace, nil + } + newNamespace := api.Namespace{} + newNamespace.ObjectMeta = namespace.ObjectMeta + newNamespace.Status = namespace.Status + newNamespace.Status.Phase = api.NamespaceTerminating + return kubeClient.Core().Namespaces().UpdateStatus(&newNamespace) +} + +// finalized returns true if the namespace.Spec.Finalizers is an empty list +func finalized(namespace *api.Namespace) bool { + return len(namespace.Spec.Finalizers) == 0 +} + +// finalizeNamespaceFunc removes the kubernetes token and finalizes the namespace +func finalizeNamespaceFunc(kubeClient clientset.Interface, namespace *api.Namespace) (*api.Namespace, error) { + namespaceFinalize := api.Namespace{} + namespaceFinalize.ObjectMeta = namespace.ObjectMeta + namespaceFinalize.Spec = namespace.Spec + finalizerSet := sets.NewString() + for i := range namespace.Spec.Finalizers { + if namespace.Spec.Finalizers[i] != api.FinalizerKubernetes { + finalizerSet.Insert(string(namespace.Spec.Finalizers[i])) + } + } + namespaceFinalize.Spec.Finalizers = make([]api.FinalizerName, 0, len(finalizerSet)) + for _, value := range finalizerSet.List() { + namespaceFinalize.Spec.Finalizers = append(namespaceFinalize.Spec.Finalizers, api.FinalizerName(value)) + } + namespace, err := kubeClient.Core().Namespaces().Finalize(&namespaceFinalize) + if err != nil { + // it was removed already, so life is good + if errors.IsNotFound(err) { + return namespace, nil + } + } + return namespace, err +} + +// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate +// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources +// are guaranteed to be gone. +// TODO: this should use discovery to delete arbitrary namespace content +func deleteAllContent(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace string, before unversioned.Time) (estimate int64, err error) { + err = deleteServiceAccounts(kubeClient, namespace) + if err != nil { + return estimate, err + } + err = deleteServices(kubeClient, namespace) + if err != nil { + return estimate, err + } + err = deleteReplicationControllers(kubeClient, namespace) + if err != nil { + return estimate, err + } + estimate, err = deletePods(kubeClient, namespace, before) + if err != nil { + return estimate, err + } + err = deleteSecrets(kubeClient, namespace) + if err != nil { + return estimate, err + } + err = deletePersistentVolumeClaims(kubeClient, namespace) + if err != nil { + return estimate, err + } + err = deleteLimitRanges(kubeClient, namespace) + if err != nil { + return estimate, err + } + err = deleteResourceQuotas(kubeClient, namespace) + if err != nil { + return estimate, err + } + err = deleteEvents(kubeClient, namespace) + if err != nil { + return estimate, err + } + // If experimental mode, delete all experimental resources for the namespace. + if containsVersion(versions, "extensions/v1beta1") { + resources, err := kubeClient.Discovery().ServerResourcesForGroupVersion("extensions/v1beta1") + if err != nil { + return estimate, err + } + if containsResource(resources, "horizontalpodautoscalers") { + err = deleteHorizontalPodAutoscalers(kubeClient.Extensions(), namespace) + if err != nil { + return estimate, err + } + } + if containsResource(resources, "ingresses") { + err = deleteIngress(kubeClient.Extensions(), namespace) + if err != nil { + return estimate, err + } + } + if containsResource(resources, "daemonsets") { + err = deleteDaemonSets(kubeClient.Extensions(), namespace) + if err != nil { + return estimate, err + } + } + if containsResource(resources, "jobs") { + err = deleteJobs(kubeClient.Extensions(), namespace) + if err != nil { + return estimate, err + } + } + if containsResource(resources, "deployments") { + err = deleteDeployments(kubeClient.Extensions(), namespace) + if err != nil { + return estimate, err + } + } + } + return estimate, nil +} + +// syncNamespace orchestrates deletion of a Namespace and its associated content. +func syncNamespace(kubeClient clientset.Interface, versions *unversioned.APIVersions, namespace *api.Namespace) error { + if namespace.DeletionTimestamp == nil { + return nil + } + + // multiple controllers may edit a namespace during termination + // first get the latest state of the namespace before proceeding + // if the namespace was deleted already, don't do anything + namespace, err := kubeClient.Core().Namespaces().Get(namespace.Name) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + + glog.V(4).Infof("Syncing namespace %s", namespace.Name) + + // ensure that the status is up to date on the namespace + // if we get a not found error, we assume the namespace is truly gone + namespace, err = retryOnConflictError(kubeClient, namespace, updateNamespaceStatusFunc) + if err != nil { + if errors.IsNotFound(err) { + return nil + } + return err + } + + // if the namespace is already finalized, delete it + if finalized(namespace) { + err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil) + if err != nil && !errors.IsNotFound(err) { + return err + } + return nil + } + + // there may still be content for us to remove + estimate, err := deleteAllContent(kubeClient, versions, namespace.Name, *namespace.DeletionTimestamp) + if err != nil { + return err + } + if estimate > 0 { + return &contentRemainingError{estimate} + } + + // we have removed content, so mark it finalized by us + result, err := retryOnConflictError(kubeClient, namespace, finalizeNamespaceFunc) + if err != nil { + return err + } + + // now check if all finalizers have reported that we delete now + if finalized(result) { + err = kubeClient.Core().Namespaces().Delete(namespace.Name, nil) + if err != nil && !errors.IsNotFound(err) { + return err + } + } + + return nil +} + +func deleteLimitRanges(kubeClient clientset.Interface, ns string) error { + return kubeClient.Core().LimitRanges(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteResourceQuotas(kubeClient clientset.Interface, ns string) error { + return kubeClient.Core().ResourceQuotas(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteServiceAccounts(kubeClient clientset.Interface, ns string) error { + return kubeClient.Core().ServiceAccounts(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteServices(kubeClient clientset.Interface, ns string) error { + items, err := kubeClient.Core().Services(ns).List(api.ListOptions{}) + if err != nil { + return err + } + for i := range items.Items { + err := kubeClient.Core().Services(ns).Delete(items.Items[i].Name, nil) + if err != nil && !errors.IsNotFound(err) { + return err + } + } + return nil +} + +func deleteReplicationControllers(kubeClient clientset.Interface, ns string) error { + return kubeClient.Core().ReplicationControllers(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deletePods(kubeClient clientset.Interface, ns string, before unversioned.Time) (int64, error) { + items, err := kubeClient.Core().Pods(ns).List(api.ListOptions{}) + if err != nil { + return 0, err + } + expired := unversioned.Now().After(before.Time) + var deleteOptions *api.DeleteOptions + if expired { + deleteOptions = api.NewDeleteOptions(0) + } + estimate := int64(0) + for i := range items.Items { + if items.Items[i].Spec.TerminationGracePeriodSeconds != nil { + grace := *items.Items[i].Spec.TerminationGracePeriodSeconds + if grace > estimate { + estimate = grace + } + } + err := kubeClient.Core().Pods(ns).Delete(items.Items[i].Name, deleteOptions) + if err != nil && !errors.IsNotFound(err) { + return 0, err + } + } + if expired { + estimate = 0 + } + return estimate, nil +} + +func deleteEvents(kubeClient clientset.Interface, ns string) error { + return kubeClient.Core().Events(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteSecrets(kubeClient clientset.Interface, ns string) error { + return kubeClient.Core().Secrets(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deletePersistentVolumeClaims(kubeClient clientset.Interface, ns string) error { + return kubeClient.Core().PersistentVolumeClaims(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteHorizontalPodAutoscalers(expClient extensions_unversioned.ExtensionsInterface, ns string) error { + return expClient.HorizontalPodAutoscalers(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteDaemonSets(expClient extensions_unversioned.ExtensionsInterface, ns string) error { + return expClient.DaemonSets(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteJobs(expClient extensions_unversioned.ExtensionsInterface, ns string) error { + return expClient.Jobs(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteDeployments(expClient extensions_unversioned.ExtensionsInterface, ns string) error { + return expClient.Deployments(ns).DeleteCollection(nil, api.ListOptions{}) +} + +func deleteIngress(expClient extensions_unversioned.ExtensionsInterface, ns string) error { + return expClient.Ingresses(ns).DeleteCollection(nil, api.ListOptions{}) +} + +// TODO: this is duplicated logic. Move it somewhere central? +func containsVersion(versions *unversioned.APIVersions, version string) bool { + for ix := range versions.Versions { + if versions.Versions[ix] == version { + return true + } + } + return false +} + +// TODO: this is duplicated logic. Move it somewhere central? +func containsResource(resources *unversioned.APIResourceList, resourceName string) bool { + if resources == nil { + return false + } + for ix := range resources.APIResources { + resource := resources.APIResources[ix] + if resource.Name == resourceName { + return true + } + } + return false +}