From c3baf402f5c273da5a73ac078efeff905644ed3a Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Thu, 23 Feb 2017 11:16:13 -0800 Subject: [PATCH] gc changes --- pkg/controller/controller_ref_manager.go | 2 +- pkg/controller/controller_utils.go | 3 + .../garbagecollector/garbagecollector.go | 1045 ++++++----------- .../garbagecollector/garbagecollector_test.go | 153 ++- pkg/controller/garbagecollector/graph.go | 159 +++ .../garbagecollector/graph_builder.go | 497 ++++++++ pkg/controller/garbagecollector/operations.go | 135 +++ pkg/controller/garbagecollector/patch.go | 54 + pkg/controller/replicaset/replica_set.go | 14 +- .../replication/replication_controller.go | 13 +- 10 files changed, 1324 insertions(+), 751 deletions(-) create mode 100644 pkg/controller/garbagecollector/graph.go create mode 100644 pkg/controller/garbagecollector/graph_builder.go create mode 100644 pkg/controller/garbagecollector/operations.go create mode 100644 pkg/controller/garbagecollector/patch.go diff --git a/pkg/controller/controller_ref_manager.go b/pkg/controller/controller_ref_manager.go index 7fcc78bf508..226c9b8d3ec 100644 --- a/pkg/controller/controller_ref_manager.go +++ b/pkg/controller/controller_ref_manager.go @@ -191,7 +191,7 @@ func (m *PodControllerRefManager) AdoptPod(pod *v1.Pod) error { // Note that ValidateOwnerReferences() will reject this patch if another // OwnerReference exists with controller=true. addControllerPatch := fmt.Sprintf( - `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true}],"uid":"%s"}}`, + `{"metadata":{"ownerReferences":[{"apiVersion":"%s","kind":"%s","name":"%s","uid":"%s","controller":true,"blockOwnerDeletion":true}],"uid":"%s"}}`, m.controllerKind.GroupVersion(), m.controllerKind.Kind, m.controller.GetName(), m.controller.GetUID(), pod.UID) return m.podControl.PatchPod(pod.Namespace, pod.Name, []byte(addControllerPatch)) diff --git a/pkg/controller/controller_utils.go b/pkg/controller/controller_utils.go index 1bf2848ba8c..e090ee93fbf 100644 --- a/pkg/controller/controller_utils.go +++ b/pkg/controller/controller_utils.go @@ -483,6 +483,9 @@ func (r RealPodControl) CreatePodsWithControllerRef(namespace string, template * if controllerRef.Controller == nil || *controllerRef.Controller != true { return fmt.Errorf("controllerRef.Controller is not set") } + if controllerRef.BlockOwnerDeletion == nil || *controllerRef.BlockOwnerDeletion != true { + return fmt.Errorf("controllerRef.BlockOwnerDeletion is not set") + } return r.createPods("", namespace, template, controllerObject, controllerRef) } diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 3e61ce18e83..1e12ff259bd 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -18,7 +18,6 @@ package garbagecollector import ( "fmt" - "sync" "time" "github.com/golang/glog" @@ -27,655 +26,128 @@ import ( "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" utilerrors "k8s.io/apimachinery/pkg/util/errors" utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/apimachinery/pkg/watch" "k8s.io/client-go/dynamic" "k8s.io/client-go/tools/cache" - "k8s.io/client-go/util/clock" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" - + // install the prometheus plugin + _ "k8s.io/kubernetes/pkg/util/workqueue/prometheus" // import known versions _ "k8s.io/client-go/kubernetes" ) const ResourceResyncTime time.Duration = 0 -type monitor struct { - store cache.Store - controller cache.Controller -} - -type objectReference struct { - metav1.OwnerReference - // This is needed by the dynamic client - Namespace string -} - -func (s objectReference) String() string { - return fmt.Sprintf("[%s/%s, namespace: %s, name: %s, uid: %s]", s.APIVersion, s.Kind, s.Namespace, s.Name, s.UID) -} - -// node does not require a lock to protect. The single-threaded -// Propagator.processEvent() is the sole writer of the nodes. The multi-threaded -// GarbageCollector.processItem() reads the nodes, but it only reads the fields -// that never get changed by Propagator.processEvent(). -type node struct { - identity objectReference - // dependents will be read by the orphan() routine, we need to protect it with a lock. - dependentsLock sync.RWMutex - dependents map[*node]struct{} - // When processing an Update event, we need to compare the updated - // ownerReferences with the owners recorded in the graph. - owners []metav1.OwnerReference -} - -func (ownerNode *node) addDependent(dependent *node) { - ownerNode.dependentsLock.Lock() - defer ownerNode.dependentsLock.Unlock() - ownerNode.dependents[dependent] = struct{}{} -} - -func (ownerNode *node) deleteDependent(dependent *node) { - ownerNode.dependentsLock.Lock() - defer ownerNode.dependentsLock.Unlock() - delete(ownerNode.dependents, dependent) -} - -type eventType int - -const ( - addEvent eventType = iota - updateEvent - deleteEvent -) - -type event struct { - eventType eventType - obj interface{} - // the update event comes with an old object, but it's not used by the garbage collector. - oldObj interface{} -} - -type concurrentUIDToNode struct { - *sync.RWMutex - uidToNode map[types.UID]*node -} - -func (m *concurrentUIDToNode) Write(node *node) { - m.Lock() - defer m.Unlock() - m.uidToNode[node.identity.UID] = node -} - -func (m *concurrentUIDToNode) Read(uid types.UID) (*node, bool) { - m.RLock() - defer m.RUnlock() - n, ok := m.uidToNode[uid] - return n, ok -} - -func (m *concurrentUIDToNode) Delete(uid types.UID) { - m.Lock() - defer m.Unlock() - delete(m.uidToNode, uid) -} - -type Propagator struct { - eventQueue *workqueue.TimedWorkQueue - // uidToNode doesn't require a lock to protect, because only the - // single-threaded Propagator.processEvent() reads/writes it. - uidToNode *concurrentUIDToNode - gc *GarbageCollector -} - -// addDependentToOwners adds n to owners' dependents list. If the owner does not -// exist in the p.uidToNode yet, a "virtual" node will be created to represent -// the owner. The "virtual" node will be enqueued to the dirtyQueue, so that -// processItem() will verify if the owner exists according to the API server. -func (p *Propagator) addDependentToOwners(n *node, owners []metav1.OwnerReference) { - for _, owner := range owners { - ownerNode, ok := p.uidToNode.Read(owner.UID) - if !ok { - // Create a "virtual" node in the graph for the owner if it doesn't - // exist in the graph yet. Then enqueue the virtual node into the - // dirtyQueue. The garbage processor will enqueue a virtual delete - // event to delete it from the graph if API server confirms this - // owner doesn't exist. - ownerNode = &node{ - identity: objectReference{ - OwnerReference: owner, - Namespace: n.identity.Namespace, - }, - dependents: make(map[*node]struct{}), - } - glog.V(6).Infof("add virtual node.identity: %s\n\n", ownerNode.identity) - p.uidToNode.Write(ownerNode) - p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: ownerNode}) - } - ownerNode.addDependent(n) - } -} - -// insertNode insert the node to p.uidToNode; then it finds all owners as listed -// in n.owners, and adds the node to their dependents list. -func (p *Propagator) insertNode(n *node) { - p.uidToNode.Write(n) - p.addDependentToOwners(n, n.owners) -} - -// removeDependentFromOwners remove n from owners' dependents list. -func (p *Propagator) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) { - for _, owner := range owners { - ownerNode, ok := p.uidToNode.Read(owner.UID) - if !ok { - continue - } - ownerNode.deleteDependent(n) - } -} - -// removeNode removes the node from p.uidToNode, then finds all -// owners as listed in n.owners, and removes n from their dependents list. -func (p *Propagator) removeNode(n *node) { - p.uidToNode.Delete(n.identity.UID) - p.removeDependentFromOwners(n, n.owners) -} - -// TODO: profile this function to see if a naive N^2 algorithm performs better -// when the number of references is small. -func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference) { - oldUIDToRef := make(map[string]metav1.OwnerReference) - for i := 0; i < len(old); i++ { - oldUIDToRef[string(old[i].UID)] = old[i] - } - oldUIDSet := sets.StringKeySet(oldUIDToRef) - newUIDToRef := make(map[string]metav1.OwnerReference) - for i := 0; i < len(new); i++ { - newUIDToRef[string(new[i].UID)] = new[i] - } - newUIDSet := sets.StringKeySet(newUIDToRef) - - addedUID := newUIDSet.Difference(oldUIDSet) - removedUID := oldUIDSet.Difference(newUIDSet) - - for uid := range addedUID { - added = append(added, newUIDToRef[uid]) - } - for uid := range removedUID { - removed = append(removed, oldUIDToRef[uid]) - } - return added, removed -} - -func shouldOrphanDependents(e *event, accessor metav1.Object) bool { - // The delta_fifo may combine the creation and update of the object into one - // event, so we need to check AddEvent as well. - if e.oldObj == nil { - if accessor.GetDeletionTimestamp() == nil { - return false - } - } else { - oldAccessor, err := meta.Accessor(e.oldObj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err)) - return false - } - // ignore the event if it's not updating DeletionTimestamp from non-nil to nil. - if accessor.GetDeletionTimestamp() == nil || oldAccessor.GetDeletionTimestamp() != nil { - return false - } - } - finalizers := accessor.GetFinalizers() - for _, finalizer := range finalizers { - if finalizer == metav1.FinalizerOrphan { - return true - } - } - return false -} - -// dependents are copies of pointers to the owner's dependents, they don't need to be locked. -func (gc *GarbageCollector) orhpanDependents(owner objectReference, dependents []*node) error { - var failedDependents []objectReference - var errorsSlice []error - for _, dependent := range dependents { - // the dependent.identity.UID is used as precondition - deleteOwnerRefPatch := fmt.Sprintf(`{"metadata":{"ownerReferences":[{"$patch":"delete","uid":"%s"}],"uid":"%s"}}`, owner.UID, dependent.identity.UID) - _, err := gc.patchObject(dependent.identity, []byte(deleteOwnerRefPatch)) - // note that if the target ownerReference doesn't exist in the - // dependent, strategic merge patch will NOT return an error. - if err != nil && !errors.IsNotFound(err) { - errorsSlice = append(errorsSlice, fmt.Errorf("orphaning %s failed with %v", dependent.identity, err)) - } - } - if len(failedDependents) != 0 { - return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error()) - } - glog.V(6).Infof("successfully updated all dependents") - return nil -} - -// TODO: Using Patch when strategicmerge supports deleting an entry from a -// slice of a base type. -func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error { - const retries = 5 - for count := 0; count < retries; count++ { - ownerObject, err := gc.getObject(owner.identity) - if err != nil { - return fmt.Errorf("cannot finalize owner %s, because cannot get it. The garbage collector will retry later.", owner.identity) - } - accessor, err := meta.Accessor(ownerObject) - if err != nil { - return fmt.Errorf("cannot access the owner object: %v. The garbage collector will retry later.", err) - } - finalizers := accessor.GetFinalizers() - var newFinalizers []string - found := false - for _, f := range finalizers { - if f == metav1.FinalizerOrphan { - found = true - break - } else { - newFinalizers = append(newFinalizers, f) - } - } - if !found { - glog.V(6).Infof("the orphan finalizer is already removed from object %s", owner.identity) - return nil - } - // remove the owner from dependent's OwnerReferences - ownerObject.SetFinalizers(newFinalizers) - _, err = gc.updateObject(owner.identity, ownerObject) - if err == nil { - return nil - } - if err != nil && !errors.IsConflict(err) { - return fmt.Errorf("cannot update the finalizers of owner %s, with error: %v, tried %d times", owner.identity, err, count+1) - } - // retry if it's a conflict - glog.V(6).Infof("got conflict updating the owner object %s, tried %d times", owner.identity, count+1) - } - return fmt.Errorf("updateMaxRetries(%d) has reached. The garbage collector will retry later for owner %v.", retries, owner.identity) -} - -// orphanFinalizer dequeues a node from the orphanQueue, then finds its dependents -// based on the graph maintained by the GC, then removes it from the -// OwnerReferences of its dependents, and finally updates the owner to remove -// the "Orphan" finalizer. The node is add back into the orphanQueue if any of -// these steps fail. -func (gc *GarbageCollector) orphanFinalizer() { - timedItem, quit := gc.orphanQueue.Get() - if quit { - return - } - defer gc.orphanQueue.Done(timedItem) - owner, ok := timedItem.Object.(*node) - if !ok { - utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", timedItem.Object)) - } - // we don't need to lock each element, because they never get updated - owner.dependentsLock.RLock() - dependents := make([]*node, 0, len(owner.dependents)) - for dependent := range owner.dependents { - dependents = append(dependents, dependent) - } - owner.dependentsLock.RUnlock() - - err := gc.orhpanDependents(owner.identity, dependents) - if err != nil { - glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err) - gc.orphanQueue.Add(timedItem) - return - } - // update the owner, remove "orphaningFinalizer" from its finalizers list - err = gc.removeOrphanFinalizer(owner) - if err != nil { - glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err) - gc.orphanQueue.Add(timedItem) - } - OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime)) -} - -// Dequeueing an event from eventQueue, updating graph, populating dirty_queue. -func (p *Propagator) processEvent() { - timedItem, quit := p.eventQueue.Get() - if quit { - return - } - defer p.eventQueue.Done(timedItem) - event, ok := timedItem.Object.(*event) - if !ok { - utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", timedItem.Object)) - return - } - obj := event.obj - accessor, err := meta.Accessor(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) - return - } - typeAccessor, err := meta.TypeAccessor(obj) - if err != nil { - utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) - return - } - glog.V(6).Infof("Propagator process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType) - // Check if the node already exsits - existingNode, found := p.uidToNode.Read(accessor.GetUID()) - switch { - case (event.eventType == addEvent || event.eventType == updateEvent) && !found: - newNode := &node{ - identity: objectReference{ - OwnerReference: metav1.OwnerReference{ - APIVersion: typeAccessor.GetAPIVersion(), - Kind: typeAccessor.GetKind(), - UID: accessor.GetUID(), - Name: accessor.GetName(), - }, - Namespace: accessor.GetNamespace(), - }, - dependents: make(map[*node]struct{}), - owners: accessor.GetOwnerReferences(), - } - p.insertNode(newNode) - // the underlying delta_fifo may combine a creation and deletion into one event - if shouldOrphanDependents(event, accessor) { - glog.V(6).Infof("add %s to the orphanQueue", newNode.identity) - p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode}) - } - case (event.eventType == addEvent || event.eventType == updateEvent) && found: - // caveat: if GC observes the creation of the dependents later than the - // deletion of the owner, then the orphaning finalizer won't be effective. - if shouldOrphanDependents(event, accessor) { - glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity) - p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode}) - } - // add/remove owner refs - added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences()) - if len(added) == 0 && len(removed) == 0 { - glog.V(6).Infof("The updateEvent %#v doesn't change node references, ignore", event) - return - } - // update the node itself - existingNode.owners = accessor.GetOwnerReferences() - // Add the node to its new owners' dependent lists. - p.addDependentToOwners(existingNode, added) - // remove the node from the dependent list of node that are no long in - // the node's owners list. - p.removeDependentFromOwners(existingNode, removed) - case event.eventType == deleteEvent: - if !found { - glog.V(6).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID()) - return - } - p.removeNode(existingNode) - existingNode.dependentsLock.RLock() - defer existingNode.dependentsLock.RUnlock() - if len(existingNode.dependents) > 0 { - p.gc.absentOwnerCache.Add(accessor.GetUID()) - } - for dep := range existingNode.dependents { - p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep}) - } - } - EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime)) -} - -// GarbageCollector is responsible for carrying out cascading deletion, and -// removing ownerReferences from the dependents if the owner is deleted with -// DeleteOptions.OrphanDependents=true. +// GarbageCollector runs reflectors to watch for changes of managed API +// objects, funnels the results to a single-threaded dependencyGraphBuilder, +// which builds a graph caching the dependencies among objects. Triggered by the +// graph changes, the dependencyGraphBuilder enqueues objects that can +// potentially be garbage-collected to the `attemptToDelete` queue, and enqueues +// objects whose dependents need to be orphaned to the `attemptToOrphan` queue. +// The GarbageCollector has workers who consume these two queues, send requests +// to the API server to delete/update the objects accordingly. +// Note that having the dependencyGraphBuilder notify the garbage collector +// ensures that the garbage collector operates with a graph that is at least as +// up to date as the notification is sent. type GarbageCollector struct { restMapper meta.RESTMapper - // metaOnlyClientPool uses a special codec, which removes fields except for - // apiVersion, kind, and metadata during decoding. - metaOnlyClientPool dynamic.ClientPool // clientPool uses the regular dynamicCodec. We need it to update // finalizers. It can be removed if we support patching finalizers. - clientPool dynamic.ClientPool - dirtyQueue *workqueue.TimedWorkQueue - orphanQueue *workqueue.TimedWorkQueue - monitors []monitor - propagator *Propagator - clock clock.Clock - registeredRateLimiter *RegisteredRateLimiter - registeredRateLimiterForMonitors *RegisteredRateLimiter + clientPool dynamic.ClientPool + // garbage collector attempts to delete the items in attemptToDelete queue when the time is ripe. + attemptToDelete workqueue.RateLimitingInterface + // garbage collector attempts to orphan the dependents of the items in the attemptToOrphan queue, then deletes the items. + attemptToOrphan workqueue.RateLimitingInterface + dependencyGraphBuilder *GraphBuilder + // used to register exactly once the rate limiter of the dynamic client + // used by the garbage collector controller. + registeredRateLimiter *RegisteredRateLimiter // GC caches the owners that do not exist according to the API server. absentOwnerCache *UIDCache } -func gcListWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *cache.ListWatch { - return &cache.ListWatch{ - ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { - // APIResource.Kind is not used by the dynamic client, so - // leave it empty. We want to list this resource in all - // namespaces if it's namespace scoped, so leave - // APIResource.Namespaced as false is all right. - apiResource := metav1.APIResource{Name: resource.Resource} - return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback). - Resource(&apiResource, metav1.NamespaceAll). - List(options) - }, - WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { - // APIResource.Kind is not used by the dynamic client, so - // leave it empty. We want to list this resource in all - // namespaces if it's namespace scoped, so leave - // APIResource.Namespaced as false is all right. - apiResource := metav1.APIResource{Name: resource.Resource} - return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback). - Resource(&apiResource, metav1.NamespaceAll). - Watch(options) - }, - } -} - -func (gc *GarbageCollector) monitorFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (monitor, error) { - // TODO: consider store in one storage. - glog.V(6).Infof("create storage for resource %s", resource) - var monitor monitor - client, err := gc.metaOnlyClientPool.ClientForGroupVersionKind(kind) - if err != nil { - return monitor, err - } - gc.registeredRateLimiterForMonitors.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") - setObjectTypeMeta := func(obj interface{}) { - runtimeObject, ok := obj.(runtime.Object) - if !ok { - utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj)) - } - runtimeObject.GetObjectKind().SetGroupVersionKind(kind) - } - monitor.store, monitor.controller = cache.NewInformer( - gcListWatcher(client, resource), - nil, - ResourceResyncTime, - cache.ResourceEventHandlerFuncs{ - // add the event to the propagator's eventQueue. - AddFunc: func(obj interface{}) { - setObjectTypeMeta(obj) - event := &event{ - eventType: addEvent, - obj: obj, - } - gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) - }, - UpdateFunc: func(oldObj, newObj interface{}) { - setObjectTypeMeta(newObj) - event := &event{updateEvent, newObj, oldObj} - gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) - }, - DeleteFunc: func(obj interface{}) { - // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it - if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok { - obj = deletedFinalStateUnknown.Obj - } - setObjectTypeMeta(obj) - event := &event{ - eventType: deleteEvent, - obj: obj, - } - gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) - }, - }, - ) - return monitor, nil -} - -var ignoredResources = map[schema.GroupVersionResource]struct{}{ - schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicationcontrollers"}: {}, - schema.GroupVersionResource{Group: "", Version: "v1", Resource: "bindings"}: {}, - schema.GroupVersionResource{Group: "", Version: "v1", Resource: "componentstatuses"}: {}, - schema.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}: {}, - schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1beta1", Resource: "tokenreviews"}: {}, - schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1", Resource: "tokenreviews"}: {}, - schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "subjectaccessreviews"}: {}, - schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "selfsubjectaccessreviews"}: {}, - schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {}, - schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1", Resource: "subjectaccessreviews"}: {}, - schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1", Resource: "selfsubjectaccessreviews"}: {}, - schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1", Resource: "localsubjectaccessreviews"}: {}, -} - func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, mapper meta.RESTMapper, deletableResources map[schema.GroupVersionResource]struct{}) (*GarbageCollector, error) { + attemptToDelete := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_delete") + attemptToOrphan := workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_attempt_to_orphan") + absentOwnerCache := NewUIDCache(500) gc := &GarbageCollector{ - metaOnlyClientPool: metaOnlyClientPool, - clientPool: clientPool, - restMapper: mapper, - clock: clock.RealClock{}, - dirtyQueue: workqueue.NewTimedWorkQueue(), - orphanQueue: workqueue.NewTimedWorkQueue(), - registeredRateLimiter: NewRegisteredRateLimiter(deletableResources), - registeredRateLimiterForMonitors: NewRegisteredRateLimiter(deletableResources), - absentOwnerCache: NewUIDCache(500), + clientPool: clientPool, + restMapper: mapper, + attemptToDelete: attemptToDelete, + attemptToOrphan: attemptToOrphan, + registeredRateLimiter: NewRegisteredRateLimiter(deletableResources), + absentOwnerCache: absentOwnerCache, } - gc.propagator = &Propagator{ - eventQueue: workqueue.NewTimedWorkQueue(), + gb := &GraphBuilder{ + metaOnlyClientPool: metaOnlyClientPool, + registeredRateLimiterForControllers: NewRegisteredRateLimiter(deletableResources), + restMapper: mapper, + graphChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "garbage_collector_graph_changes"), uidToNode: &concurrentUIDToNode{ - RWMutex: &sync.RWMutex{}, uidToNode: make(map[types.UID]*node), }, - gc: gc, + attemptToDelete: attemptToDelete, + attemptToOrphan: attemptToOrphan, + absentOwnerCache: absentOwnerCache, } - for resource := range deletableResources { - if _, ok := ignoredResources[resource]; ok { - glog.V(6).Infof("ignore resource %#v", resource) - continue - } - kind, err := gc.restMapper.KindFor(resource) - if err != nil { - if _, ok := err.(*meta.NoResourceMatchError); ok { - // ignore NoResourceMatchErrors for now because TPRs won't be registered - // and hence the RestMapper does not know about them. The deletableResources - // though are using discovery which included TPRs. - // TODO: use dynamic discovery for RestMapper and deletableResources - glog.Warningf("ignore NoResourceMatchError for %v", resource) - continue - } - return nil, err - } - monitor, err := gc.monitorFor(resource, kind) - if err != nil { - return nil, err - } - gc.monitors = append(gc.monitors, monitor) + if err := gb.monitorsForResources(deletableResources); err != nil { + return nil, err } + gc.dependencyGraphBuilder = gb + return gc, nil } -func (gc *GarbageCollector) worker() { - timedItem, quit := gc.dirtyQueue.Get() +func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { + defer gc.attemptToDelete.ShutDown() + defer gc.attemptToOrphan.ShutDown() + defer gc.dependencyGraphBuilder.graphChanges.ShutDown() + + glog.Infof("Garbage Collector: Initializing") + gc.dependencyGraphBuilder.Run(stopCh) + if !cache.WaitForCacheSync(stopCh, gc.dependencyGraphBuilder.HasSynced) { + return + } + glog.Infof("Garbage Collector: All resource monitors have synced. Proceeding to collect garbage") + + // gc workers + for i := 0; i < workers; i++ { + go wait.Until(gc.runAttemptToDeleteWorker, 1*time.Second, stopCh) + go wait.Until(gc.runAttemptToOrphanWorker, 1*time.Second, stopCh) + } + Register() + <-stopCh + glog.Infof("Garbage Collector: Shutting down") +} + +func (gc *GarbageCollector) runAttemptToDeleteWorker() { + for gc.attemptToDeleteWorker() { + } +} + +func (gc *GarbageCollector) attemptToDeleteWorker() bool { + item, quit := gc.attemptToDelete.Get() if quit { - return + return false } - defer gc.dirtyQueue.Done(timedItem) - err := gc.processItem(timedItem.Object.(*node)) + defer gc.attemptToDelete.Done(item) + n, ok := item.(*node) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item)) + return true + } + err := gc.attemptToDeleteItem(n) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", timedItem.Object, err)) + utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", n, err)) // retry if garbage collection of an object failed. - gc.dirtyQueue.Add(timedItem) - return + gc.attemptToDelete.AddRateLimited(item) } - DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime)) -} - -// apiResource consults the REST mapper to translate an tuple to a metav1.APIResource struct. -func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool) (*metav1.APIResource, error) { - fqKind := schema.FromAPIVersionAndKind(apiVersion, kind) - mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), apiVersion) - if err != nil { - return nil, fmt.Errorf("unable to get REST mapping for kind: %s, version: %s", kind, apiVersion) - } - glog.V(6).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource) - resource := metav1.APIResource{ - Name: mapping.Resource, - Namespaced: namespaced, - Kind: kind, - } - return &resource, nil -} - -func (gc *GarbageCollector) deleteObject(item objectReference) error { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) - if err != nil { - return err - } - uid := item.UID - preconditions := metav1.Preconditions{UID: &uid} - deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions} - return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions) -} - -func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) - if err != nil { - return nil, err - } - return client.Resource(resource, item.Namespace).Get(item.Name) -} - -func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) - if err != nil { - return nil, err - } - return client.Resource(resource, item.Namespace).Update(obj) -} - -func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) { - fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) - gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") - resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) - if err != nil { - return nil, err - } - return client.Resource(resource, item.Namespace).Patch(item.Name, types.StrategicMergePatchType, patch) -} - -func objectReferenceToUnstructured(ref objectReference) *unstructured.Unstructured { - ret := &unstructured.Unstructured{} - ret.SetKind(ref.Kind) - ret.SetAPIVersion(ref.APIVersion) - ret.SetUID(ref.UID) - ret.SetNamespace(ref.Namespace) - ret.SetName(ref.Name) - return ret + return true } func objectReferenceToMetadataOnlyObject(ref objectReference) *metaonly.MetadataOnlyObject { @@ -692,121 +164,284 @@ func objectReferenceToMetadataOnlyObject(ref objectReference) *metaonly.Metadata } } -func (gc *GarbageCollector) processItem(item *node) error { - // Get the latest item from the API server - latest, err := gc.getObject(item.identity) +// isDangling check if a reference is pointing to an object that doesn't exist. +// If isDangling looks up the referenced object at the API server, it also +// returns its latest state. +func (gc *GarbageCollector) isDangling(reference metav1.OwnerReference, item *node) ( + dangling bool, owner *unstructured.Unstructured, err error) { + if gc.absentOwnerCache.Has(reference.UID) { + glog.V(5).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + return true, nil, nil + } + // TODO: we need to verify the reference resource is supported by the + // system. If it's not a valid resource, the garbage collector should i) + // ignore the reference when decide if the object should be deleted, and + // ii) should update the object to remove such references. This is to + // prevent objects having references to an old resource from being + // deleted during a cluster upgrade. + fqKind := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind) + client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) if err != nil { - if errors.IsNotFound(err) { - // the Propagator can add "virtual" node for an owner that doesn't - // exist yet, so we need to enqueue a virtual Delete event to remove - // the virtual node from Propagator.uidToNode. - glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity) - event := &event{ - eventType: deleteEvent, - obj: objectReferenceToMetadataOnlyObject(item.identity), - } - glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj) - gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) - return nil + return false, nil, err + } + resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0) + if err != nil { + return false, nil, err + } + // TODO: It's only necessary to talk to the API server if the owner node + // is a "virtual" node. The local graph could lag behind the real + // status, but in practice, the difference is small. + owner, err = client.Resource(resource, item.identity.Namespace).Get(reference.Name) + switch { + case errors.IsNotFound(err): + gc.absentOwnerCache.Add(reference.UID) + glog.V(5).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + return true, nil, nil + case err != nil: + return false, nil, err + } + + if owner.GetUID() != reference.UID { + glog.V(5).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + gc.absentOwnerCache.Add(reference.UID) + return true, nil, nil + } + return false, owner, nil +} + +// classify the latestReferences to three categories: +// solid: the owner exists, and is not "waitingForDependentsDeletion" +// dangling: the owner does not exist +// waitingForDependentsDeletion: the owner exists, its deletionTimestamp is non-nil, and it has +// FinalizerDeletingDependents +// This function communicates with the server. +func (gc *GarbageCollector) classifyReferences(item *node, latestReferences []metav1.OwnerReference) ( + solid, dangling, waitingForDependentsDeletion []metav1.OwnerReference, err error) { + for _, reference := range latestReferences { + isDangling, owner, err := gc.isDangling(reference, item) + if err != nil { + return nil, nil, nil, err } - return err - } - if latest.GetUID() != item.identity.UID { - glog.V(6).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity) - event := &event{ - eventType: deleteEvent, - obj: objectReferenceToMetadataOnlyObject(item.identity), - } - glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj) - gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) - return nil - } - ownerReferences := latest.GetOwnerReferences() - if len(ownerReferences) == 0 { - glog.V(6).Infof("object %s's doesn't have an owner, continue on next item", item.identity) - return nil - } - // TODO: we need to remove dangling references if the object is not to be - // deleted. - for _, reference := range ownerReferences { - if gc.absentOwnerCache.Has(reference.UID) { - glog.V(6).Infof("according to the absentOwnerCache, object %s's owner %s/%s, %s does not exist", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + if isDangling { + dangling = append(dangling, reference) continue } - // TODO: we need to verify the reference resource is supported by the - // system. If it's not a valid resource, the garbage collector should i) - // ignore the reference when decide if the object should be deleted, and - // ii) should update the object to remove such references. This is to - // prevent objects having references to an old resource from being - // deleted during a cluster upgrade. - fqKind := schema.FromAPIVersionAndKind(reference.APIVersion, reference.Kind) - client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + + ownerAccessor, err := meta.Accessor(owner) if err != nil { - return err + return nil, nil, nil, err } - resource, err := gc.apiResource(reference.APIVersion, reference.Kind, len(item.identity.Namespace) != 0) - if err != nil { - return err - } - owner, err := client.Resource(resource, item.identity.Namespace).Get(reference.Name) - if err == nil { - if owner.GetUID() != reference.UID { - glog.V(6).Infof("object %s's owner %s/%s, %s is not found, UID mismatch", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) - gc.absentOwnerCache.Add(reference.UID) - continue - } - glog.V(6).Infof("object %s has at least an existing owner, will not garbage collect", item.identity.UID) - return nil - } else if errors.IsNotFound(err) { - gc.absentOwnerCache.Add(reference.UID) - glog.V(6).Infof("object %s's owner %s/%s, %s is not found", item.identity.UID, reference.APIVersion, reference.Kind, reference.Name) + if ownerAccessor.GetDeletionTimestamp() != nil && hasDeleteDependentsFinalizer(ownerAccessor) { + waitingForDependentsDeletion = append(waitingForDependentsDeletion, reference) } else { - return err + solid = append(solid, reference) } } - glog.V(2).Infof("none of object %s's owners exist any more, will garbage collect it", item.identity) - return gc.deleteObject(item.identity) + return solid, dangling, waitingForDependentsDeletion, nil } -func (gc *GarbageCollector) Run(workers int, stopCh <-chan struct{}) { - glog.Infof("Garbage Collector: Initializing") - for _, monitor := range gc.monitors { - go monitor.controller.Run(stopCh) +func (gc *GarbageCollector) generateVirtualDeleteEvent(identity objectReference) { + event := &event{ + eventType: deleteEvent, + obj: objectReferenceToMetadataOnlyObject(identity), + } + glog.V(5).Infof("generating virtual delete event for %s\n\n", event.obj) + gc.dependencyGraphBuilder.enqueueChanges(event) +} + +func ownerRefsToUIDs(refs []metav1.OwnerReference) []types.UID { + var ret []types.UID + for _, ref := range refs { + ret = append(ret, ref.UID) + } + return ret +} + +func (gc *GarbageCollector) attemptToDeleteItem(item *node) error { + glog.V(2).Infof("processing item %s", item.identity) + // "being deleted" is an one-way trip to the final deletion. We'll just wait for the final deletion, and then process the object's dependents. + if item.isBeingDeleted() && !item.isDeletingDependents() { + glog.V(5).Infof("processing item %s returned at once, because its DeletionTimestamp is non-nil", item.identity) + return nil + } + // TODO: It's only necessary to talk to the API server if this is a + // "virtual" node. The local graph could lag behind the real status, but in + // practice, the difference is small. + latest, err := gc.getObject(item.identity) + switch { + case errors.IsNotFound(err): + // the GraphBuilder can add "virtual" node for an owner that doesn't + // exist yet, so we need to enqueue a virtual Delete event to remove + // the virtual node from GraphBuilder.uidToNode. + glog.V(5).Infof("item %v not found, generating a virtual delete event", item.identity) + gc.generateVirtualDeleteEvent(item.identity) + return nil + case err != nil: + return err } - wait.PollInfinite(10*time.Second, func() (bool, error) { - for _, monitor := range gc.monitors { - if !monitor.controller.HasSynced() { - glog.Infof("Garbage Collector: Waiting for resource monitors to be synced...") - return false, nil + if latest.GetUID() != item.identity.UID { + glog.V(5).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity) + gc.generateVirtualDeleteEvent(item.identity) + return nil + } + + // TODO: attemptToOrphanWorker() routine is similar. Consider merging + // attemptToOrphanWorker() into attemptToDeleteItem() as well. + if item.isDeletingDependents() { + return gc.processDeletingDependentsItem(item) + } + + // compute if we should delete the item + ownerReferences := latest.GetOwnerReferences() + if len(ownerReferences) == 0 { + glog.V(2).Infof("object %s's doesn't have an owner, continue on next item", item.identity) + return nil + } + + solid, dangling, waitingForDependentsDeletion, err := gc.classifyReferences(item, ownerReferences) + if err != nil { + return err + } + glog.V(5).Infof("classify references of %s.\nsolid: %#v\ndangling: %#v\nwaitingForDependentsDeletion: %#v\n", item.identity, solid, dangling, waitingForDependentsDeletion) + + switch { + case len(solid) != 0: + glog.V(2).Infof("object %s has at least one existing owner: %#v, will not garbage collect", solid, item.identity) + if len(dangling) != 0 || len(waitingForDependentsDeletion) != 0 { + glog.V(2).Infof("remove dangling references %#v and waiting references %#v for object %s", dangling, waitingForDependentsDeletion, item.identity) + } + // waitingForDependentsDeletion needs to be deleted from the + // ownerReferences, otherwise the referenced objects will be stuck with + // the FinalizerDeletingDependents and never get deleted. + patch := deleteOwnerRefPatch(item.identity.UID, append(ownerRefsToUIDs(dangling), ownerRefsToUIDs(waitingForDependentsDeletion)...)...) + _, err = gc.patchObject(item.identity, patch) + return err + case len(waitingForDependentsDeletion) != 0 && item.dependentsLength() != 0: + deps := item.getDependents() + for _, dep := range deps { + if dep.isDeletingDependents() { + // this circle detection has false positives, we need to + // apply a more rigorous detection if this turns out to be a + // problem. + // there are multiple workers run attemptToDeleteItem in + // parallel, the circle detection can fail in a race condition. + glog.V(2).Infof("processing object %s, some of its owners and its dependent [%s] have FinalizerDeletingDependents, to prevent potential cycle, its ownerReferences are going to be modified to be non-blocking, then the object is going to be deleted with Foreground", item.identity, dep.identity) + patch, err := item.patchToUnblockOwnerReferences() + if err != nil { + return err + } + if _, err := gc.patchObject(item.identity, patch); err != nil { + return err + } + break } } - return true, nil - }) - glog.Infof("Garbage Collector: All monitored resources synced. Proceeding to collect garbage") - - // worker - go wait.Until(gc.propagator.processEvent, 0, stopCh) - - for i := 0; i < workers; i++ { - go wait.Until(gc.worker, 0, stopCh) - go wait.Until(gc.orphanFinalizer, 0, stopCh) + glog.V(2).Infof("at least one owner of object %s has FinalizerDeletingDependents, and the object itself has dependents, so it is going to be deleted with Foreground", item.identity) + // the deletion event will be observed by the graphBuilder, so the item + // will be processed again in processDeletingDependentsItem. If it + // doesn't have dependents, the function will remove the + // FinalizerDeletingDependents from the item, resulting in the final + // deletion of the item. + policy := metav1.DeletePropagationForeground + return gc.deleteObject(item.identity, &policy) + default: + // item doesn't have any solid owner, so it needs to be garbage + // collected. Also, none of item's owners is waiting for the deletion of + // the dependents, so GC deletes item with Default. + glog.V(2).Infof("delete object %s with Default", item.identity) + return gc.deleteObject(item.identity, nil) } - Register() - <-stopCh - glog.Infof("Garbage Collector: Shutting down") - gc.dirtyQueue.ShutDown() - gc.orphanQueue.ShutDown() - gc.propagator.eventQueue.ShutDown() } -// *FOR TEST USE ONLY* It's not safe to call this function when the GC is still -// busy. -// GraphHasUID returns if the Propagator has a particular UID store in its +// process item that's waiting for its dependents to be deleted +func (gc *GarbageCollector) processDeletingDependentsItem(item *node) error { + blockingDependents := item.blockingDependents() + if len(blockingDependents) == 0 { + glog.V(2).Infof("remove DeleteDependents finalizer for item %s", item.identity) + return gc.removeFinalizer(item, metav1.FinalizerDeleteDependents) + } + for _, dep := range blockingDependents { + if !dep.isDeletingDependents() { + glog.V(2).Infof("adding %s to attemptToDelete, because its owner %s is deletingDependents", dep.identity, item.identity) + gc.attemptToDelete.Add(dep) + } + } + return nil +} + +// dependents are copies of pointers to the owner's dependents, they don't need to be locked. +func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents []*node) error { + var failedDependents []objectReference + var errorsSlice []error + for _, dependent := range dependents { + // the dependent.identity.UID is used as precondition + patch := deleteOwnerRefPatch(dependent.identity.UID, owner.UID) + _, err := gc.patchObject(dependent.identity, patch) + // note that if the target ownerReference doesn't exist in the + // dependent, strategic merge patch will NOT return an error. + if err != nil && !errors.IsNotFound(err) { + errorsSlice = append(errorsSlice, fmt.Errorf("orphaning %s failed with %v", dependent.identity, err)) + } + } + if len(failedDependents) != 0 { + return fmt.Errorf("failed to orphan dependents of owner %s, got errors: %s", owner, utilerrors.NewAggregate(errorsSlice).Error()) + } + glog.V(5).Infof("successfully updated all dependents of owner %s", owner) + return nil +} + +func (gc *GarbageCollector) runAttemptToOrphanWorker() { + for gc.attemptToOrphanWorker() { + } +} + +// attemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its +// dependents based on the graph maintained by the GC, then removes it from the +// OwnerReferences of its dependents, and finally updates the owner to remove +// the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of +// these steps fail. +func (gc *GarbageCollector) attemptToOrphanWorker() bool { + item, quit := gc.attemptToOrphan.Get() + if quit { + return false + } + defer gc.attemptToOrphan.Done(item) + owner, ok := item.(*node) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item)) + return true + } + // we don't need to lock each element, because they never get updated + owner.dependentsLock.RLock() + dependents := make([]*node, 0, len(owner.dependents)) + for dependent := range owner.dependents { + dependents = append(dependents, dependent) + } + owner.dependentsLock.RUnlock() + + err := gc.orphanDependents(owner.identity, dependents) + if err != nil { + glog.V(5).Infof("orphanDependents for %s failed with %v", owner.identity, err) + gc.attemptToOrphan.AddRateLimited(item) + return true + } + // update the owner, remove "orphaningFinalizer" from its finalizers list + err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents) + if err != nil { + glog.V(5).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err) + gc.attemptToOrphan.AddRateLimited(item) + } + return true +} + +// *FOR TEST USE ONLY* +// GraphHasUID returns if the GraphBuilder has a particular UID store in its // uidToNode graph. It's useful for debugging. +// This method is used by integration tests. func (gc *GarbageCollector) GraphHasUID(UIDs []types.UID) bool { for _, u := range UIDs { - if _, ok := gc.propagator.uidToNode.Read(u); ok { + if _, ok := gc.dependencyGraphBuilder.uidToNode.Read(u); ok { return true } } diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index ff658feda4b..901ae996799 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -19,6 +19,7 @@ package garbagecollector import ( "net/http" "net/http/httptest" + "reflect" "strings" "sync" "testing" @@ -27,15 +28,16 @@ import ( _ "k8s.io/kubernetes/pkg/api/install" + "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/serializer" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/json" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/client-go/dynamic" restclient "k8s.io/client-go/rest" - "k8s.io/client-go/util/clock" "k8s.io/client-go/util/workqueue" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/v1" @@ -53,7 +55,7 @@ func TestNewGarbageCollector(t *testing.T) { if err != nil { t.Fatal(err) } - assert.Equal(t, 1, len(gc.monitors)) + assert.Equal(t, 1, len(gc.dependencyGraphBuilder.monitors)) } // fakeAction records information about requests to aid in testing. @@ -142,8 +144,8 @@ func serilizeOrDie(t *testing.T, object interface{}) []byte { return data } -// test the processItem function making the expected actions. -func TestProcessItem(t *testing.T) { +// test the attemptToDeleteItem function making the expected actions. +func TestAttemptToDeleteItem(t *testing.T) { pod := getPod("ToBeDeletedPod", []metav1.OwnerReference{ { Kind: "ReplicationController", @@ -177,10 +179,10 @@ func TestProcessItem(t *testing.T) { }, Namespace: pod.Namespace, }, - // owners are intentionally left empty. The processItem routine should get the latest item from the server. + // owners are intentionally left empty. The attemptToDeleteItem routine should get the latest item from the server. owners: nil, } - err := gc.processItem(item) + err := gc.attemptToDeleteItem(item) if err != nil { t.Errorf("Unexpected Error: %v", err) } @@ -249,7 +251,7 @@ func TestProcessEvent(t *testing.T) { var testScenarios = []struct { name string // a series of events that will be supplied to the - // Propagator.eventQueue. + // GraphBuilder.eventQueue. events []event }{ { @@ -293,22 +295,19 @@ func TestProcessEvent(t *testing.T) { } for _, scenario := range testScenarios { - propagator := &Propagator{ - eventQueue: workqueue.NewTimedWorkQueue(), + dependencyGraphBuilder := &GraphBuilder{ + graphChanges: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), uidToNode: &concurrentUIDToNode{ - RWMutex: &sync.RWMutex{}, - uidToNode: make(map[types.UID]*node), - }, - gc: &GarbageCollector{ - dirtyQueue: workqueue.NewTimedWorkQueue(), - clock: clock.RealClock{}, - absentOwnerCache: NewUIDCache(2), + uidToNodeLock: sync.RWMutex{}, + uidToNode: make(map[types.UID]*node), }, + attemptToDelete: workqueue.NewRateLimitingQueue(workqueue.DefaultControllerRateLimiter()), + absentOwnerCache: NewUIDCache(2), } for i := 0; i < len(scenario.events); i++ { - propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: propagator.gc.clock.Now(), Object: &scenario.events[i]}) - propagator.processEvent() - verifyGraphInvariants(scenario.name, propagator.uidToNode.uidToNode, t) + dependencyGraphBuilder.graphChanges.Add(&scenario.events[i]) + dependencyGraphBuilder.processGraphChanges() + verifyGraphInvariants(scenario.name, dependencyGraphBuilder.uidToNode.uidToNode, t) } } } @@ -321,18 +320,18 @@ func TestDependentsRace(t *testing.T) { const updates = 100 owner := &node{dependents: make(map[*node]struct{})} ownerUID := types.UID("owner") - gc.propagator.uidToNode.Write(owner) + gc.dependencyGraphBuilder.uidToNode.Write(owner) go func() { for i := 0; i < updates; i++ { dependent := &node{} - gc.propagator.addDependentToOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}}) - gc.propagator.removeDependentFromOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}}) + gc.dependencyGraphBuilder.addDependentToOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}}) + gc.dependencyGraphBuilder.removeDependentFromOwners(dependent, []metav1.OwnerReference{{UID: ownerUID}}) } }() go func() { - gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: owner}) + gc.attemptToOrphan.Add(owner) for i := 0; i < updates; i++ { - gc.orphanFinalizer() + gc.attemptToOrphanWorker() } }() } @@ -348,9 +347,13 @@ func TestGCListWatcher(t *testing.T) { if err != nil { t.Fatal(err) } - lw := gcListWatcher(client, podResource) - lw.Watch(metav1.ListOptions{ResourceVersion: "1"}) - lw.List(metav1.ListOptions{ResourceVersion: "1"}) + lw := listWatcher(client, podResource) + if _, err := lw.Watch(metav1.ListOptions{ResourceVersion: "1"}); err != nil { + t.Fatal(err) + } + if _, err := lw.List(metav1.ListOptions{ResourceVersion: "1"}); err != nil { + t.Fatal(err) + } if e, a := 2, len(testHandler.actions); e != a { t.Errorf("expect %d requests, got %d", e, a) } @@ -373,7 +376,7 @@ func podToGCNode(pod *v1.Pod) *node { }, Namespace: pod.Namespace, }, - // owners are intentionally left empty. The processItem routine should get the latest item from the server. + // owners are intentionally left empty. The attemptToDeleteItem routine should get the latest item from the server. owners: nil, } } @@ -447,12 +450,12 @@ func TestAbsentUIDCache(t *testing.T) { defer srv.Close() gc := setupGC(t, clientConfig) gc.absentOwnerCache = NewUIDCache(2) - gc.processItem(podToGCNode(rc1Pod1)) - gc.processItem(podToGCNode(rc2Pod1)) + gc.attemptToDeleteItem(podToGCNode(rc1Pod1)) + gc.attemptToDeleteItem(podToGCNode(rc2Pod1)) // rc1 should already be in the cache, no request should be sent. rc1 should be promoted in the UIDCache - gc.processItem(podToGCNode(rc1Pod2)) + gc.attemptToDeleteItem(podToGCNode(rc1Pod2)) // after this call, rc2 should be evicted from the UIDCache - gc.processItem(podToGCNode(rc3Pod1)) + gc.attemptToDeleteItem(podToGCNode(rc3Pod1)) // check cache if !gc.absentOwnerCache.Has(types.UID("1")) { t.Errorf("expected rc1 to be in the cache") @@ -474,3 +477,89 @@ func TestAbsentUIDCache(t *testing.T) { t.Errorf("expected only 1 GET rc1 request, got %d", count) } } + +func TestDeleteOwnerRefPatch(t *testing.T) { + original := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "100", + OwnerReferences: []metav1.OwnerReference{ + {UID: "1"}, + {UID: "2"}, + {UID: "3"}, + }, + }, + } + originalData := serilizeOrDie(t, original) + expected := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "100", + OwnerReferences: []metav1.OwnerReference{ + {UID: "1"}, + }, + }, + } + patch := deleteOwnerRefPatch("100", "2", "3") + patched, err := strategicpatch.StrategicMergePatch(originalData, patch, v1.Pod{}) + if err != nil { + t.Fatal(err) + } + var got v1.Pod + if err := json.Unmarshal(patched, &got); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(expected, got) { + t.Errorf("expected: %#v,\ngot: %#v", expected, got) + } +} + +func TestUnblockOwnerReference(t *testing.T) { + trueVar := true + falseVar := false + original := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "100", + OwnerReferences: []metav1.OwnerReference{ + {UID: "1", BlockOwnerDeletion: &trueVar}, + {UID: "2", BlockOwnerDeletion: &falseVar}, + {UID: "3"}, + }, + }, + } + originalData := serilizeOrDie(t, original) + expected := v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + UID: "100", + OwnerReferences: []metav1.OwnerReference{ + {UID: "1", BlockOwnerDeletion: &falseVar}, + {UID: "2", BlockOwnerDeletion: &falseVar}, + {UID: "3"}, + }, + }, + } + accessor, err := meta.Accessor(&original) + if err != nil { + t.Fatal(err) + } + n := node{ + owners: accessor.GetOwnerReferences(), + } + patch, err := n.patchToUnblockOwnerReferences() + if err != nil { + t.Fatal(err) + } + patched, err := strategicpatch.StrategicMergePatch(originalData, patch, v1.Pod{}) + if err != nil { + t.Fatal(err) + } + var got v1.Pod + if err := json.Unmarshal(patched, &got); err != nil { + t.Fatal(err) + } + if !reflect.DeepEqual(expected, got) { + t.Errorf("expected: %#v,\ngot: %#v", expected, got) + t.Errorf("expected: %#v,\ngot: %#v", expected.OwnerReferences, got.OwnerReferences) + for _, ref := range got.OwnerReferences { + t.Errorf("ref.UID=%s, ref.BlockOwnerDeletion=%v", ref.UID, *ref.BlockOwnerDeletion) + } + } +} diff --git a/pkg/controller/garbagecollector/graph.go b/pkg/controller/garbagecollector/graph.go new file mode 100644 index 00000000000..59b36c2ebfa --- /dev/null +++ b/pkg/controller/garbagecollector/graph.go @@ -0,0 +1,159 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "fmt" + "sync" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" +) + +type objectReference struct { + metav1.OwnerReference + // This is needed by the dynamic client + Namespace string +} + +func (s objectReference) String() string { + return fmt.Sprintf("[%s/%s, namespace: %s, name: %s, uid: %s]", s.APIVersion, s.Kind, s.Namespace, s.Name, s.UID) +} + +// The single-threaded GraphBuilder.processEvent() is the sole writer of the +// nodes. The multi-threaded GarbageCollector.processItem() reads the nodes. +// WARNING: node has different locks on different fields. setters and getters +// use the respective locks, so the return values of the getters can be +// inconsistent. +type node struct { + identity objectReference + // dependents will be read by the orphan() routine, we need to protect it with a lock. + dependentsLock sync.RWMutex + // dependents are the nodes that have node.identity as a + // metadata.ownerReference. + dependents map[*node]struct{} + // this is set by processEvent() if the object has non-nil DeletionTimestamp + // and has the FinalizerDeleteDependents. + deletingDependents bool + deletingDependentsLock sync.RWMutex + // this records if the object's deletionTimestamp is non-nil. + beingDeleted bool + beingDeletedLock sync.RWMutex + // when processing an Update event, we need to compare the updated + // ownerReferences with the owners recorded in the graph. + owners []metav1.OwnerReference +} + +// An object is on a one way trip to its final deletion if it starts being +// deleted, so we only provide a function to set beingDeleted to true. +func (n *node) markBeingDeleted() { + n.beingDeletedLock.Lock() + defer n.beingDeletedLock.Unlock() + n.beingDeleted = true +} + +func (n *node) isBeingDeleted() bool { + n.beingDeletedLock.RLock() + defer n.beingDeletedLock.RUnlock() + return n.beingDeleted +} + +func (n *node) markDeletingDependents() { + n.deletingDependentsLock.Lock() + defer n.deletingDependentsLock.Unlock() + n.deletingDependents = true +} + +func (n *node) isDeletingDependents() bool { + n.deletingDependentsLock.RLock() + defer n.deletingDependentsLock.RUnlock() + return n.deletingDependents +} + +func (ownerNode *node) addDependent(dependent *node) { + ownerNode.dependentsLock.Lock() + defer ownerNode.dependentsLock.Unlock() + ownerNode.dependents[dependent] = struct{}{} +} + +func (ownerNode *node) deleteDependent(dependent *node) { + ownerNode.dependentsLock.Lock() + defer ownerNode.dependentsLock.Unlock() + delete(ownerNode.dependents, dependent) +} + +func (ownerNode *node) dependentsLength() int { + ownerNode.dependentsLock.RLock() + defer ownerNode.dependentsLock.RUnlock() + return len(ownerNode.dependents) +} + +// Note that this function does not provide any synchronization guarantees; +// items could be added to or removed from ownerNode.dependents the moment this +// function returns. +func (ownerNode *node) getDependents() []*node { + ownerNode.dependentsLock.RLock() + defer ownerNode.dependentsLock.RUnlock() + var ret []*node + for dep := range ownerNode.dependents { + ret = append(ret, dep) + } + return ret +} + +// blockingDependents returns the dependents that are blocking the deletion of +// n, i.e., the dependent that has an ownerReference pointing to n, and +// the BlockOwnerDeletion field of that ownerReference is true. +// Note that this function does not provide any synchronization guarantees; +// items could be added to or removed from ownerNode.dependents the moment this +// function returns. +func (n *node) blockingDependents() []*node { + dependents := n.getDependents() + var ret []*node + for _, dep := range dependents { + for _, owner := range dep.owners { + if owner.UID == n.identity.UID && owner.BlockOwnerDeletion != nil && *owner.BlockOwnerDeletion { + ret = append(ret, dep) + } + } + } + return ret +} + +type concurrentUIDToNode struct { + uidToNodeLock sync.RWMutex + uidToNode map[types.UID]*node +} + +func (m *concurrentUIDToNode) Write(node *node) { + m.uidToNodeLock.Lock() + defer m.uidToNodeLock.Unlock() + m.uidToNode[node.identity.UID] = node +} + +func (m *concurrentUIDToNode) Read(uid types.UID) (*node, bool) { + m.uidToNodeLock.RLock() + defer m.uidToNodeLock.RUnlock() + n, ok := m.uidToNode[uid] + return n, ok +} + +func (m *concurrentUIDToNode) Delete(uid types.UID) { + m.uidToNodeLock.Lock() + defer m.uidToNodeLock.Unlock() + delete(m.uidToNode, uid) +} diff --git a/pkg/controller/garbagecollector/graph_builder.go b/pkg/controller/garbagecollector/graph_builder.go new file mode 100644 index 00000000000..a3d6754b8d9 --- /dev/null +++ b/pkg/controller/garbagecollector/graph_builder.go @@ -0,0 +1,497 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "fmt" + "reflect" + "time" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + "k8s.io/client-go/dynamic" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" +) + +type eventType int + +const ( + addEvent eventType = iota + updateEvent + deleteEvent +) + +type event struct { + eventType eventType + obj interface{} + // the update event comes with an old object, but it's not used by the garbage collector. + oldObj interface{} +} + +// GraphBuilder: based on the events supplied by the informers, GraphBuilder updates +// uidToNode, a graph that caches the dependencies as we know, and enqueues +// items to the attemptToDelete and attemptToOrphan. +type GraphBuilder struct { + restMapper meta.RESTMapper + // each monitor list/watches a resource, the results are funneled to the + // dependencyGraphBuilder + monitors []cache.Controller + // metaOnlyClientPool uses a special codec, which removes fields except for + // apiVersion, kind, and metadata during decoding. + metaOnlyClientPool dynamic.ClientPool + // used to register exactly once the rate limiters of the clients used by + // the `monitors`. + registeredRateLimiterForControllers *RegisteredRateLimiter + // monitors are the producer of the graphChanges queue, graphBuilder alters + // the in-memory graph according to the changes. + graphChanges workqueue.RateLimitingInterface + // uidToNode doesn't require a lock to protect, because only the + // single-threaded GraphBuilder.processGraphChanges() reads/writes it. + uidToNode *concurrentUIDToNode + // GraphBuilder is the producer of attemptToDelete and attemptToOrphan, GC is the consumer. + attemptToDelete workqueue.RateLimitingInterface + attemptToOrphan workqueue.RateLimitingInterface + // GraphBuilder and GC share the absentOwnerCache. Objects that are known to + // be non-existent are added to the cached. + absentOwnerCache *UIDCache +} + +func listWatcher(client *dynamic.Client, resource schema.GroupVersionResource) *cache.ListWatch { + return &cache.ListWatch{ + ListFunc: func(options metav1.ListOptions) (runtime.Object, error) { + // APIResource.Kind is not used by the dynamic client, so + // leave it empty. We want to list this resource in all + // namespaces if it's namespace scoped, so leave + // APIResource.Namespaced as false is all right. + apiResource := metav1.APIResource{Name: resource.Resource} + return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback). + Resource(&apiResource, metav1.NamespaceAll). + List(options) + }, + WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) { + // APIResource.Kind is not used by the dynamic client, so + // leave it empty. We want to list this resource in all + // namespaces if it's namespace scoped, so leave + // APIResource.Namespaced as false is all right. + apiResource := metav1.APIResource{Name: resource.Resource} + return client.ParameterCodec(dynamic.VersionedParameterEncoderWithV1Fallback). + Resource(&apiResource, metav1.NamespaceAll). + Watch(options) + }, + } +} + +func (gb *GraphBuilder) controllerFor(resource schema.GroupVersionResource, kind schema.GroupVersionKind) (cache.Controller, error) { + // TODO: consider store in one storage. + glog.V(5).Infof("create storage for resource %s", resource) + client, err := gb.metaOnlyClientPool.ClientForGroupVersionKind(kind) + if err != nil { + return nil, err + } + gb.registeredRateLimiterForControllers.registerIfNotPresent(resource.GroupVersion(), client, "garbage_collector_monitoring") + setObjectTypeMeta := func(obj interface{}) { + runtimeObject, ok := obj.(runtime.Object) + if !ok { + utilruntime.HandleError(fmt.Errorf("expected runtime.Object, got %#v", obj)) + } + runtimeObject.GetObjectKind().SetGroupVersionKind(kind) + } + _, monitor := cache.NewInformer( + listWatcher(client, resource), + nil, + ResourceResyncTime, + cache.ResourceEventHandlerFuncs{ + // add the event to the dependencyGraphBuilder's graphChanges. + AddFunc: func(obj interface{}) { + setObjectTypeMeta(obj) + event := &event{ + eventType: addEvent, + obj: obj, + } + gb.graphChanges.Add(event) + }, + UpdateFunc: func(oldObj, newObj interface{}) { + setObjectTypeMeta(newObj) + // TODO: check if there are differences in the ownerRefs, + // finalizers, and DeletionTimestamp; if not, ignore the update. + event := &event{updateEvent, newObj, oldObj} + gb.graphChanges.Add(event) + }, + DeleteFunc: func(obj interface{}) { + // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it + if deletedFinalStateUnknown, ok := obj.(cache.DeletedFinalStateUnknown); ok { + obj = deletedFinalStateUnknown.Obj + } + setObjectTypeMeta(obj) + event := &event{ + eventType: deleteEvent, + obj: obj, + } + gb.graphChanges.Add(event) + }, + }, + ) + return monitor, nil +} + +func (gb *GraphBuilder) monitorsForResources(resources map[schema.GroupVersionResource]struct{}) error { + for resource := range resources { + if _, ok := ignoredResources[resource]; ok { + glog.V(5).Infof("ignore resource %#v", resource) + continue + } + kind, err := gb.restMapper.KindFor(resource) + if err != nil { + return err + } + monitor, err := gb.controllerFor(resource, kind) + if err != nil { + return err + } + gb.monitors = append(gb.monitors, monitor) + } + return nil +} + +func (gb *GraphBuilder) HasSynced() bool { + for _, monitor := range gb.monitors { + if !monitor.HasSynced() { + return false + } + } + return true +} + +func (gb *GraphBuilder) Run(stopCh <-chan struct{}) { + for _, monitor := range gb.monitors { + go monitor.Run(stopCh) + } + go wait.Until(gb.runProcessGraphChanges, 1*time.Second, stopCh) +} + +var ignoredResources = map[schema.GroupVersionResource]struct{}{ + schema.GroupVersionResource{Group: "extensions", Version: "v1beta1", Resource: "replicationcontrollers"}: {}, + schema.GroupVersionResource{Group: "", Version: "v1", Resource: "bindings"}: {}, + schema.GroupVersionResource{Group: "", Version: "v1", Resource: "componentstatuses"}: {}, + schema.GroupVersionResource{Group: "", Version: "v1", Resource: "events"}: {}, + schema.GroupVersionResource{Group: "authentication.k8s.io", Version: "v1beta1", Resource: "tokenreviews"}: {}, + schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "subjectaccessreviews"}: {}, + schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "selfsubjectaccessreviews"}: {}, + schema.GroupVersionResource{Group: "authorization.k8s.io", Version: "v1beta1", Resource: "localsubjectaccessreviews"}: {}, +} + +func (gb *GraphBuilder) enqueueChanges(e *event) { + gb.graphChanges.Add(e) +} + +// addDependentToOwners adds n to owners' dependents list. If the owner does not +// exist in the gb.uidToNode yet, a "virtual" node will be created to represent +// the owner. The "virtual" node will be enqueued to the attemptToDelete, so that +// processItem() will verify if the owner exists according to the API server. +func (gb *GraphBuilder) addDependentToOwners(n *node, owners []metav1.OwnerReference) { + for _, owner := range owners { + ownerNode, ok := gb.uidToNode.Read(owner.UID) + if !ok { + // Create a "virtual" node in the graph for the owner if it doesn't + // exist in the graph yet. Then enqueue the virtual node into the + // attemptToDelete. The garbage processor will enqueue a virtual delete + // event to delete it from the graph if API server confirms this + // owner doesn't exist. + ownerNode = &node{ + identity: objectReference{ + OwnerReference: owner, + Namespace: n.identity.Namespace, + }, + dependents: make(map[*node]struct{}), + } + glog.V(5).Infof("add virtual node.identity: %s\n\n", ownerNode.identity) + gb.uidToNode.Write(ownerNode) + gb.attemptToDelete.Add(ownerNode) + } + ownerNode.addDependent(n) + } +} + +// insertNode insert the node to gb.uidToNode; then it finds all owners as listed +// in n.owners, and adds the node to their dependents list. +func (gb *GraphBuilder) insertNode(n *node) { + gb.uidToNode.Write(n) + gb.addDependentToOwners(n, n.owners) +} + +// removeDependentFromOwners remove n from owners' dependents list. +func (gb *GraphBuilder) removeDependentFromOwners(n *node, owners []metav1.OwnerReference) { + for _, owner := range owners { + ownerNode, ok := gb.uidToNode.Read(owner.UID) + if !ok { + continue + } + ownerNode.deleteDependent(n) + } +} + +// removeNode removes the node from gb.uidToNode, then finds all +// owners as listed in n.owners, and removes n from their dependents list. +func (gb *GraphBuilder) removeNode(n *node) { + gb.uidToNode.Delete(n.identity.UID) + gb.removeDependentFromOwners(n, n.owners) +} + +type ownerRefPair struct { + oldRef metav1.OwnerReference + newRef metav1.OwnerReference +} + +// TODO: profile this function to see if a naive N^2 algorithm performs better +// when the number of references is small. +func referencesDiffs(old []metav1.OwnerReference, new []metav1.OwnerReference) (added []metav1.OwnerReference, removed []metav1.OwnerReference, changed []ownerRefPair) { + oldUIDToRef := make(map[string]metav1.OwnerReference) + for i := 0; i < len(old); i++ { + oldUIDToRef[string(old[i].UID)] = old[i] + } + oldUIDSet := sets.StringKeySet(oldUIDToRef) + newUIDToRef := make(map[string]metav1.OwnerReference) + for i := 0; i < len(new); i++ { + newUIDToRef[string(new[i].UID)] = new[i] + } + newUIDSet := sets.StringKeySet(newUIDToRef) + + addedUID := newUIDSet.Difference(oldUIDSet) + removedUID := oldUIDSet.Difference(newUIDSet) + intersection := oldUIDSet.Intersection(newUIDSet) + + for uid := range addedUID { + added = append(added, newUIDToRef[uid]) + } + for uid := range removedUID { + removed = append(removed, oldUIDToRef[uid]) + } + for uid := range intersection { + if !reflect.DeepEqual(oldUIDToRef[uid], newUIDToRef[uid]) { + changed = append(changed, ownerRefPair{oldRef: oldUIDToRef[uid], newRef: newUIDToRef[uid]}) + } + } + return added, removed, changed +} + +// returns if the object in the event just transitions to "being deleted". +func deletionStarts(oldObj interface{}, newAccessor metav1.Object) bool { + // The delta_fifo may combine the creation and update of the object into one + // event, so if there is no oldObj, we just return if the newObj (via + // newAccessor) is being deleted. + if oldObj == nil { + if newAccessor.GetDeletionTimestamp() == nil { + return false + } + return true + } + oldAccessor, err := meta.Accessor(oldObj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access oldObj: %v", err)) + return false + } + return beingDeleted(newAccessor) && !beingDeleted(oldAccessor) +} + +func beingDeleted(accessor metav1.Object) bool { + return accessor.GetDeletionTimestamp() != nil +} + +func hasDeleteDependentsFinalizer(accessor metav1.Object) bool { + finalizers := accessor.GetFinalizers() + for _, finalizer := range finalizers { + if finalizer == metav1.FinalizerDeleteDependents { + return true + } + } + return false +} + +func hasOrphanFinalizer(accessor metav1.Object) bool { + finalizers := accessor.GetFinalizers() + for _, finalizer := range finalizers { + if finalizer == metav1.FinalizerOrphanDependents { + return true + } + } + return false +} + +// this function takes newAccessor directly because the caller already +// instantiates an accessor for the newObj. +func startsWaitingForDependentsDeleted(oldObj interface{}, newAccessor metav1.Object) bool { + return deletionStarts(oldObj, newAccessor) && hasDeleteDependentsFinalizer(newAccessor) +} + +// this function takes newAccessor directly because the caller already +// instantiates an accessor for the newObj. +func startsWaitingForDependentsOrphaned(oldObj interface{}, newAccessor metav1.Object) bool { + return deletionStarts(oldObj, newAccessor) && hasOrphanFinalizer(newAccessor) +} + +// if an blocking ownerReference points to an object gets removed, or gets set to +// "BlockOwnerDeletion=false", add the object to the attemptToDelete queue. +func (gb *GraphBuilder) addUnblockedOwnersToDeleteQueue(removed []metav1.OwnerReference, changed []ownerRefPair) { + for _, ref := range removed { + if ref.BlockOwnerDeletion != nil && *ref.BlockOwnerDeletion { + node, found := gb.uidToNode.Read(ref.UID) + if !found { + glog.V(5).Infof("cannot find %s in uidToNode", ref.UID) + continue + } + gb.attemptToDelete.Add(node) + } + } + for _, c := range changed { + wasBlocked := c.oldRef.BlockOwnerDeletion != nil && *c.oldRef.BlockOwnerDeletion + isUnblocked := c.newRef.BlockOwnerDeletion == nil || (c.newRef.BlockOwnerDeletion != nil && !*c.newRef.BlockOwnerDeletion) + if wasBlocked && isUnblocked { + node, found := gb.uidToNode.Read(c.newRef.UID) + if !found { + glog.V(5).Infof("cannot find %s in uidToNode", c.newRef.UID) + continue + } + gb.attemptToDelete.Add(node) + } + } +} + +func (gb *GraphBuilder) processTransitions(oldObj interface{}, newAccessor metav1.Object, n *node) { + if startsWaitingForDependentsOrphaned(oldObj, newAccessor) { + glog.V(5).Infof("add %s to the attemptToOrphan", n.identity) + gb.attemptToOrphan.Add(n) + return + } + if startsWaitingForDependentsDeleted(oldObj, newAccessor) { + glog.V(2).Infof("add %s to the attemptToDelete, because it's waiting for its dependents to be deleted", n.identity) + // if the n is added as a "virtual" node, its deletingDependents field is not properly set, so always set it here. + n.markDeletingDependents() + for dep := range n.dependents { + gb.attemptToDelete.Add(dep) + } + gb.attemptToDelete.Add(n) + } +} + +func (gb *GraphBuilder) runProcessGraphChanges() { + for gb.processGraphChanges() { + } +} + +// Dequeueing an event from graphChanges, updating graph, populating dirty_queue. +func (gb *GraphBuilder) processGraphChanges() bool { + item, quit := gb.graphChanges.Get() + if quit { + return false + } + defer gb.graphChanges.Done(item) + event, ok := item.(*event) + if !ok { + utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", item)) + return true + } + obj := event.obj + accessor, err := meta.Accessor(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) + return true + } + typeAccessor, err := meta.TypeAccessor(obj) + if err != nil { + utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) + return true + } + glog.V(5).Infof("GraphBuilder process object: %s/%s, namespace %s, name %s, event type %s", typeAccessor.GetAPIVersion(), typeAccessor.GetKind(), accessor.GetNamespace(), accessor.GetName(), event.eventType) + // Check if the node already exsits + existingNode, found := gb.uidToNode.Read(accessor.GetUID()) + switch { + case (event.eventType == addEvent || event.eventType == updateEvent) && !found: + newNode := &node{ + identity: objectReference{ + OwnerReference: metav1.OwnerReference{ + APIVersion: typeAccessor.GetAPIVersion(), + Kind: typeAccessor.GetKind(), + UID: accessor.GetUID(), + Name: accessor.GetName(), + }, + Namespace: accessor.GetNamespace(), + }, + dependents: make(map[*node]struct{}), + owners: accessor.GetOwnerReferences(), + deletingDependents: beingDeleted(accessor) && hasDeleteDependentsFinalizer(accessor), + beingDeleted: beingDeleted(accessor), + } + gb.insertNode(newNode) + // the underlying delta_fifo may combine a creation and a deletion into + // one event, so we need to further process the event. + gb.processTransitions(event.oldObj, accessor, newNode) + case (event.eventType == addEvent || event.eventType == updateEvent) && found: + // handle changes in ownerReferences + added, removed, changed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences()) + if len(added) != 0 || len(removed) != 0 || len(changed) != 0 { + // check if the changed dependency graph unblock owners that are + // waiting for the deletion of their dependents. + gb.addUnblockedOwnersToDeleteQueue(removed, changed) + // update the node itself + existingNode.owners = accessor.GetOwnerReferences() + // Add the node to its new owners' dependent lists. + gb.addDependentToOwners(existingNode, added) + // remove the node from the dependent list of node that are no longer in + // the node's owners list. + gb.removeDependentFromOwners(existingNode, removed) + } + + if beingDeleted(accessor) { + existingNode.markBeingDeleted() + } + gb.processTransitions(event.oldObj, accessor, existingNode) + case event.eventType == deleteEvent: + if !found { + glog.V(5).Infof("%v doesn't exist in the graph, this shouldn't happen", accessor.GetUID()) + return true + } + // removeNode updates the graph + gb.removeNode(existingNode) + existingNode.dependentsLock.RLock() + defer existingNode.dependentsLock.RUnlock() + if len(existingNode.dependents) > 0 { + gb.absentOwnerCache.Add(accessor.GetUID()) + } + for dep := range existingNode.dependents { + gb.attemptToDelete.Add(dep) + } + for _, owner := range existingNode.owners { + ownerNode, found := gb.uidToNode.Read(owner.UID) + if !found || !ownerNode.isDeletingDependents() { + continue + } + // this is to let attempToDeleteItem check if all the owner's + // dependents are deleted, if so, the owner will be deleted. + gb.attemptToDelete.Add(ownerNode) + } + } + return true +} diff --git a/pkg/controller/garbagecollector/operations.go b/pkg/controller/garbagecollector/operations.go new file mode 100644 index 00000000000..657045b6523 --- /dev/null +++ b/pkg/controller/garbagecollector/operations.go @@ -0,0 +1,135 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "fmt" + + "github.com/golang/glog" + + "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/client/retry" +) + +// apiResource consults the REST mapper to translate an tuple to a unversioned.APIResource struct. +func (gc *GarbageCollector) apiResource(apiVersion, kind string, namespaced bool) (*metav1.APIResource, error) { + fqKind := schema.FromAPIVersionAndKind(apiVersion, kind) + mapping, err := gc.restMapper.RESTMapping(fqKind.GroupKind(), apiVersion) + if err != nil { + return nil, fmt.Errorf("unable to get REST mapping for kind: %s, version: %s", kind, apiVersion) + } + glog.V(5).Infof("map kind %s, version %s to resource %s", kind, apiVersion, mapping.Resource) + resource := metav1.APIResource{ + Name: mapping.Resource, + Namespaced: namespaced, + Kind: kind, + } + return &resource, nil +} + +func (gc *GarbageCollector) deleteObject(item objectReference, policy *metav1.DeletionPropagation) error { + fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return err + } + uid := item.UID + preconditions := metav1.Preconditions{UID: &uid} + deleteOptions := metav1.DeleteOptions{Preconditions: &preconditions, PropagationPolicy: policy} + return client.Resource(resource, item.Namespace).Delete(item.Name, &deleteOptions) +} + +func (gc *GarbageCollector) getObject(item objectReference) (*unstructured.Unstructured, error) { + fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return nil, err + } + return client.Resource(resource, item.Namespace).Get(item.Name) +} + +func (gc *GarbageCollector) updateObject(item objectReference, obj *unstructured.Unstructured) (*unstructured.Unstructured, error) { + fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return nil, err + } + return client.Resource(resource, item.Namespace).Update(obj) +} + +func (gc *GarbageCollector) patchObject(item objectReference, patch []byte) (*unstructured.Unstructured, error) { + fqKind := schema.FromAPIVersionAndKind(item.APIVersion, item.Kind) + client, err := gc.clientPool.ClientForGroupVersionKind(fqKind) + gc.registeredRateLimiter.registerIfNotPresent(fqKind.GroupVersion(), client, "garbage_collector_operation") + resource, err := gc.apiResource(item.APIVersion, item.Kind, len(item.Namespace) != 0) + if err != nil { + return nil, err + } + return client.Resource(resource, item.Namespace).Patch(item.Name, types.StrategicMergePatchType, patch) +} + +// TODO: Using Patch when strategicmerge supports deleting an entry from a +// slice of a base type. +func (gc *GarbageCollector) removeFinalizer(owner *node, targetFinalizer string) error { + err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + ownerObject, err := gc.getObject(owner.identity) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return fmt.Errorf("cannot finalize owner %s, because cannot get it: %v. The garbage collector will retry later.", owner.identity, err) + } + accessor, err := meta.Accessor(ownerObject) + if err != nil { + return fmt.Errorf("cannot access the owner object %v: %v. The garbage collector will retry later.", ownerObject, err) + } + finalizers := accessor.GetFinalizers() + var newFinalizers []string + found := false + for _, f := range finalizers { + if f == targetFinalizer { + found = true + break + } + newFinalizers = append(newFinalizers, f) + } + if !found { + glog.V(5).Infof("the orphan finalizer is already removed from object %s", owner.identity) + return nil + } + // remove the owner from dependent's OwnerReferences + ownerObject.SetFinalizers(newFinalizers) + _, err = gc.updateObject(owner.identity, ownerObject) + return err + }) + if errors.IsConflict(err) { + return fmt.Errorf("updateMaxRetries(%d) has reached. The garbage collector will retry later for owner %v.", retry.DefaultBackoff.Steps, owner.identity) + } + return err +} diff --git a/pkg/controller/garbagecollector/patch.go b/pkg/controller/garbagecollector/patch.go new file mode 100644 index 00000000000..8f8f9fb75ab --- /dev/null +++ b/pkg/controller/garbagecollector/patch.go @@ -0,0 +1,54 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package garbagecollector + +import ( + "encoding/json" + "fmt" + "strings" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/types" + "k8s.io/kubernetes/pkg/controller/garbagecollector/metaonly" +) + +func deleteOwnerRefPatch(dependentUID types.UID, ownerUIDs ...types.UID) []byte { + var pieces []string + for _, ownerUID := range ownerUIDs { + pieces = append(pieces, fmt.Sprintf(`{"$patch":"delete","uid":"%s"}`, ownerUID)) + } + patch := fmt.Sprintf(`{"metadata":{"ownerReferences":[%s],"uid":"%s"}}`, strings.Join(pieces, ","), dependentUID) + return []byte(patch) +} + +// generate a patch that unsets the BlockOwnerDeletion field of all +// ownerReferences of node. +func (n *node) patchToUnblockOwnerReferences() ([]byte, error) { + var dummy metaonly.MetadataOnlyObject + var blockingRefs []metav1.OwnerReference + falseVar := false + for _, owner := range n.owners { + if owner.BlockOwnerDeletion != nil && *owner.BlockOwnerDeletion { + ref := owner + ref.BlockOwnerDeletion = &falseVar + blockingRefs = append(blockingRefs, ref) + } + } + dummy.ObjectMeta.SetOwnerReferences(blockingRefs) + dummy.ObjectMeta.UID = n.identity.UID + return json.Marshal(dummy) +} diff --git a/pkg/controller/replicaset/replica_set.go b/pkg/controller/replicaset/replica_set.go index 8cbe7c03ed7..e165a9e5c56 100644 --- a/pkg/controller/replicaset/replica_set.go +++ b/pkg/controller/replicaset/replica_set.go @@ -481,14 +481,14 @@ func (rsc *ReplicaSetController) manageReplicas(filteredPods []*v1.Pod, rs *exte go func() { defer wg.Done() var err error - - var trueVar = true + boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ - APIVersion: getRSKind().GroupVersion().String(), - Kind: getRSKind().Kind, - Name: rs.Name, - UID: rs.UID, - Controller: &trueVar, + APIVersion: getRSKind().GroupVersion().String(), + Kind: getRSKind().Kind, + Name: rs.Name, + UID: rs.UID, + BlockOwnerDeletion: boolPtr(true), + Controller: boolPtr(true), } err = rsc.podControl.CreatePodsWithControllerRef(rs.Namespace, &rs.Spec.Template, rs, controllerRef) if err != nil { diff --git a/pkg/controller/replication/replication_controller.go b/pkg/controller/replication/replication_controller.go index cd8e57e0241..fba38f0df82 100644 --- a/pkg/controller/replication/replication_controller.go +++ b/pkg/controller/replication/replication_controller.go @@ -479,13 +479,14 @@ func (rm *ReplicationManager) manageReplicas(filteredPods []*v1.Pod, rc *v1.Repl go func() { defer wg.Done() var err error - var trueVar = true + boolPtr := func(b bool) *bool { return &b } controllerRef := &metav1.OwnerReference{ - APIVersion: getRCKind().GroupVersion().String(), - Kind: getRCKind().Kind, - Name: rc.Name, - UID: rc.UID, - Controller: &trueVar, + APIVersion: getRCKind().GroupVersion().String(), + Kind: getRCKind().Kind, + Name: rc.Name, + UID: rc.UID, + BlockOwnerDeletion: boolPtr(true), + Controller: boolPtr(true), } err = rm.podControl.CreatePodsWithControllerRef(rc.Namespace, rc.Spec.Template, rc, controllerRef) if err != nil {