diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index 4f6d445c763..4eb39e703c3 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -288,7 +288,7 @@ func (gc *GarbageCollector) IsSynced() bool { } func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) { - for gc.attemptToDeleteWorker(ctx) { + for gc.processAttemptToDeleteWorker(ctx) { } } @@ -296,7 +296,7 @@ var enqueuedVirtualDeleteEventErr = goerrors.New("enqueued virtual delete event" var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objects cannot refer to namespaced owners") -func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool { +func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool { item, quit := gc.attemptToDelete.Get() gc.workerLock.RLock() defer gc.workerLock.RUnlock() @@ -304,10 +304,30 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool { return false } defer gc.attemptToDelete.Done(item) + + action := gc.attemptToDeleteWorker(ctx, item) + switch action { + case forgetItem: + gc.attemptToDelete.Forget(item) + case requeueItem: + gc.attemptToDelete.AddRateLimited(item) + } + + return true +} + +type workQueueItemAction int + +const ( + requeueItem = iota + forgetItem +) + +func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item interface{}) workQueueItemAction { n, ok := item.(*node) if !ok { utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item)) - return true + return forgetItem } if !n.isObserved() { @@ -316,23 +336,23 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool { // this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error, // and in the meantime a deletion of the real object associated with that uid was observed klog.V(5).Infof("item %s no longer in the graph, skipping attemptToDeleteItem", n) - return true + return forgetItem } if nodeFromGraph.isObserved() { // this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error, // and in the meantime the real object associated with that uid was observed klog.V(5).Infof("item %s no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", n) - return true + return forgetItem } } err := gc.attemptToDeleteItem(ctx, n) if err == enqueuedVirtualDeleteEventErr { // a virtual event was produced and will be handled by processGraphChanges, no need to requeue this node - return true + return forgetItem } else if err == namespacedOwnerOfClusterScopedObjectErr { // a cluster-scoped object referring to a namespaced owner is an error that will not resolve on retry, no need to requeue this node - return true + return forgetItem } else if err != nil { if _, ok := err.(*restMappingError); ok { // There are at least two ways this can happen: @@ -347,15 +367,16 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool { utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err)) } // retry if garbage collection of an object failed. - gc.attemptToDelete.AddRateLimited(item) + return requeueItem } else if !n.isObserved() { // requeue if item hasn't been observed via an informer event yet. // otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed. // see https://issue.k8s.io/56121 klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity) - gc.attemptToDelete.AddRateLimited(item) + return requeueItem } - return true + + return forgetItem } // isDangling check if a reference is pointing to an object that doesn't exist. @@ -639,16 +660,16 @@ func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents [ } func (gc *GarbageCollector) runAttemptToOrphanWorker() { - for gc.attemptToOrphanWorker() { + for gc.processAttemptToOrphanWorker() { } } -// attemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its +// processAttemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its // dependents based on the graph maintained by the GC, then removes it from the // OwnerReferences of its dependents, and finally updates the owner to remove // the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of // these steps fail. -func (gc *GarbageCollector) attemptToOrphanWorker() bool { +func (gc *GarbageCollector) processAttemptToOrphanWorker() bool { item, quit := gc.attemptToOrphan.Get() gc.workerLock.RLock() defer gc.workerLock.RUnlock() @@ -656,10 +677,23 @@ func (gc *GarbageCollector) attemptToOrphanWorker() bool { return false } defer gc.attemptToOrphan.Done(item) + + action := gc.attemptToOrphanWorker(item) + switch action { + case forgetItem: + gc.attemptToOrphan.Forget(item) + case requeueItem: + gc.attemptToOrphan.AddRateLimited(item) + } + + return true +} + +func (gc *GarbageCollector) attemptToOrphanWorker(item interface{}) workQueueItemAction { owner, ok := item.(*node) if !ok { utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item)) - return true + return forgetItem } // we don't need to lock each element, because they never get updated owner.dependentsLock.RLock() @@ -672,16 +706,15 @@ func (gc *GarbageCollector) attemptToOrphanWorker() bool { err := gc.orphanDependents(owner.identity, dependents) if err != nil { utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err)) - gc.attemptToOrphan.AddRateLimited(item) - return true + return requeueItem } // update the owner, remove "orphaningFinalizer" from its finalizers list err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents) if err != nil { utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err)) - gc.attemptToOrphan.AddRateLimited(item) + return requeueItem } - return true + return forgetItem } // *FOR TEST USE ONLY* diff --git a/pkg/controller/garbagecollector/garbagecollector_test.go b/pkg/controller/garbagecollector/garbagecollector_test.go index f5af03c94f1..5df9fef8f0e 100644 --- a/pkg/controller/garbagecollector/garbagecollector_test.go +++ b/pkg/controller/garbagecollector/garbagecollector_test.go @@ -460,7 +460,7 @@ func TestDependentsRace(t *testing.T) { defer wg.Done() for i := 0; i < updates; i++ { gc.attemptToOrphan.Add(owner) - gc.attemptToOrphanWorker() + gc.processAttemptToOrphanWorker() } }() wg.Wait() @@ -2445,7 +2445,7 @@ func processAttemptToDelete(count int) step { if count <= 0 { // process all for ctx.gc.dependencyGraphBuilder.attemptToDelete.Len() != 0 { - ctx.gc.attemptToDeleteWorker(context.TODO()) + ctx.gc.processAttemptToDeleteWorker(context.TODO()) } } else { for i := 0; i < count; i++ { @@ -2453,7 +2453,7 @@ func processAttemptToDelete(count int) step { ctx.t.Errorf("expected at least %d pending changes, got %d", count, i+1) return } - ctx.gc.attemptToDeleteWorker(context.TODO()) + ctx.gc.processAttemptToDeleteWorker(context.TODO()) } } },