diff --git a/pkg/controller/framework/controller.go b/pkg/controller/framework/controller.go index bb839b3cd98..ca182343aa4 100644 --- a/pkg/controller/framework/controller.go +++ b/pkg/controller/framework/controller.go @@ -106,6 +106,16 @@ func (c *Controller) HasSynced() bool { return c.reflector.LastSyncResourceVersion() != "" } +// Requeue adds the provided object back into the queue if it does not already exist. +func (c *Controller) Requeue(obj interface{}) error { + return c.config.Queue.AddIfNotPresent(cache.Deltas{ + cache.Delta{ + Type: cache.Sync, + Object: obj, + }, + }) +} + // processLoop drains the work queue. // TODO: Consider doing the processing in parallel. This will require a little thought // to make sure that we don't end up processing the same object multiple times diff --git a/pkg/controller/namespace/namespace_controller.go b/pkg/controller/namespace/namespace_controller.go index 40f33e1c183..17963877aae 100644 --- a/pkg/controller/namespace/namespace_controller.go +++ b/pkg/controller/namespace/namespace_controller.go @@ -17,6 +17,7 @@ limitations under the License. package namespacecontroller import ( + "fmt" "time" "k8s.io/kubernetes/pkg/api" @@ -41,7 +42,8 @@ type NamespaceController struct { // NewNamespaceController creates a new NamespaceController func NewNamespaceController(kubeClient client.Interface, resyncPeriod time.Duration) *NamespaceController { - _, controller := framework.NewInformer( + var controller *framework.Controller + _, controller = framework.NewInformer( &cache.ListWatch{ ListFunc: func() (runtime.Object, error) { return kubeClient.Namespaces().List(labels.Everything(), fields.Everything()) @@ -55,16 +57,41 @@ func NewNamespaceController(kubeClient client.Interface, resyncPeriod time.Durat framework.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { namespace := obj.(*api.Namespace) - err := syncNamespace(kubeClient, *namespace) - if err != nil { - glog.Error(err) + if err := syncNamespace(kubeClient, *namespace); err != nil { + if estimate, ok := err.(*contentRemainingError); ok { + go func() { + // Estimate is the aggregate total of TerminationGracePeriodSeconds, which defaults to 30s + // for pods. However, most processes will terminate faster - within a few seconds, probably + // with a peak within 5-10s. So this division is a heuristic that avoids waiting the full + // duration when in many cases things complete more quickly. The extra second added is to + // ensure we never wait 0 seconds. + t := estimate.Estimate/2 + 1 + glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) + time.Sleep(time.Duration(t) * time.Second) + if err := controller.Requeue(namespace); err != nil { + util.HandleError(err) + } + }() + return + } + util.HandleError(err) } }, UpdateFunc: func(oldObj, newObj interface{}) { namespace := newObj.(*api.Namespace) - err := syncNamespace(kubeClient, *namespace) - if err != nil { - glog.Error(err) + if err := syncNamespace(kubeClient, *namespace); err != nil { + if estimate, ok := err.(*contentRemainingError); ok { + go func() { + t := estimate.Estimate/2 + 1 + glog.V(4).Infof("Content remaining in namespace %s, waiting %d seconds", namespace.Name, t) + time.Sleep(time.Duration(t) * time.Second) + if err := controller.Requeue(namespace); err != nil { + util.HandleError(err) + } + }() + return + } + util.HandleError(err) } }, }, @@ -114,45 +141,56 @@ func finalize(kubeClient client.Interface, namespace api.Namespace) (*api.Namesp return kubeClient.Namespaces().Finalize(&namespaceFinalize) } -// deleteAllContent will delete all content known to the system in a namespace -func deleteAllContent(kubeClient client.Interface, namespace string) (err error) { +type contentRemainingError struct { + Estimate int64 +} + +func (e *contentRemainingError) Error() string { + return fmt.Sprintf("some content remains in the namespace, estimate %d seconds before it is removed", e.Estimate) +} + +// deleteAllContent will delete all content known to the system in a namespace. It returns an estimate +// of the time remaining before the remaining resources are deleted. If estimate > 0 not all resources +// are guaranteed to be gone. +func deleteAllContent(kubeClient client.Interface, namespace string, before util.Time) (estimate int64, err error) { err = deleteServiceAccounts(kubeClient, namespace) if err != nil { - return err + return estimate, err } err = deleteServices(kubeClient, namespace) if err != nil { - return err + return estimate, err } err = deleteReplicationControllers(kubeClient, namespace) if err != nil { - return err + return estimate, err } - err = deletePods(kubeClient, namespace) + estimate, err = deletePods(kubeClient, namespace, before) if err != nil { - return err + return estimate, err } err = deleteSecrets(kubeClient, namespace) if err != nil { - return err + return estimate, err } err = deletePersistentVolumeClaims(kubeClient, namespace) if err != nil { - return err + return estimate, err } err = deleteLimitRanges(kubeClient, namespace) if err != nil { - return err + return estimate, err } err = deleteResourceQuotas(kubeClient, namespace) if err != nil { - return err + return estimate, err } err = deleteEvents(kubeClient, namespace) if err != nil { - return err + return estimate, err } - return nil + + return estimate, nil } // syncNamespace makes namespace life-cycle decisions @@ -160,6 +198,7 @@ func syncNamespace(kubeClient client.Interface, namespace api.Namespace) (err er if namespace.DeletionTimestamp == nil { return nil } + glog.V(4).Infof("Syncing namespace %s", namespace.Name) // if there is a deletion timestamp, and the status is not terminating, then update status if !namespace.DeletionTimestamp.IsZero() && namespace.Status.Phase != api.NamespaceTerminating { @@ -185,10 +224,13 @@ func syncNamespace(kubeClient client.Interface, namespace api.Namespace) (err er } // there may still be content for us to remove - err = deleteAllContent(kubeClient, namespace.Name) + estimate, err := deleteAllContent(kubeClient, namespace.Name, *namespace.DeletionTimestamp) if err != nil { return err } + if estimate > 0 { + return &contentRemainingError{estimate} + } // we have removed content, so mark it finalized by us result, err := finalize(kubeClient, namespace) @@ -277,18 +319,33 @@ func deleteReplicationControllers(kubeClient client.Interface, ns string) error return nil } -func deletePods(kubeClient client.Interface, ns string) error { +func deletePods(kubeClient client.Interface, ns string, before util.Time) (int64, error) { items, err := kubeClient.Pods(ns).List(labels.Everything(), fields.Everything()) if err != nil { - return err + return 0, err } + expired := util.Now().After(before.Time) + var deleteOptions *api.DeleteOptions + if expired { + deleteOptions = api.NewDeleteOptions(0) + } + estimate := int64(0) for i := range items.Items { - err := kubeClient.Pods(ns).Delete(items.Items[i].Name, nil) + if items.Items[i].Spec.TerminationGracePeriodSeconds != nil { + grace := *items.Items[i].Spec.TerminationGracePeriodSeconds + if grace > estimate { + estimate = grace + } + } + err := kubeClient.Pods(ns).Delete(items.Items[i].Name, deleteOptions) if err != nil && !errors.IsNotFound(err) { - return err + return 0, err } } - return nil + if expired { + estimate = 0 + } + return estimate, nil } func deleteEvents(kubeClient client.Interface, ns string) error {