refactor garbagecollection controllers to make forget impossible to forget

This commit is contained in:
astraw99 2022-01-09 15:15:03 +08:00
parent d2c9456963
commit a5a54754d5
2 changed files with 54 additions and 21 deletions

View File

@ -288,7 +288,7 @@ func (gc *GarbageCollector) IsSynced() bool {
}
func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) {
for gc.attemptToDeleteWorker(ctx) {
for gc.processAttemptToDeleteWorker(ctx) {
}
}
@ -296,7 +296,7 @@ var enqueuedVirtualDeleteEventErr = goerrors.New("enqueued virtual delete event"
var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objects cannot refer to namespaced owners")
func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool {
item, quit := gc.attemptToDelete.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
@ -304,10 +304,30 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
return false
}
defer gc.attemptToDelete.Done(item)
action := gc.attemptToDeleteWorker(ctx, item)
switch action {
case forgetItem:
gc.attemptToDelete.Forget(item)
case requeueItem:
gc.attemptToDelete.AddRateLimited(item)
}
return true
}
type workQueueItemAction int
const (
requeueItem = iota
forgetItem
)
func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item interface{}) workQueueItemAction {
n, ok := item.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
return true
return forgetItem
}
if !n.isObserved() {
@ -316,23 +336,23 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
// and in the meantime a deletion of the real object associated with that uid was observed
klog.V(5).Infof("item %s no longer in the graph, skipping attemptToDeleteItem", n)
return true
return forgetItem
}
if nodeFromGraph.isObserved() {
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
// and in the meantime the real object associated with that uid was observed
klog.V(5).Infof("item %s no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", n)
return true
return forgetItem
}
}
err := gc.attemptToDeleteItem(ctx, n)
if err == enqueuedVirtualDeleteEventErr {
// a virtual event was produced and will be handled by processGraphChanges, no need to requeue this node
return true
return forgetItem
} else if err == namespacedOwnerOfClusterScopedObjectErr {
// a cluster-scoped object referring to a namespaced owner is an error that will not resolve on retry, no need to requeue this node
return true
return forgetItem
} else if err != nil {
if _, ok := err.(*restMappingError); ok {
// There are at least two ways this can happen:
@ -347,15 +367,16 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
}
// retry if garbage collection of an object failed.
gc.attemptToDelete.AddRateLimited(item)
return requeueItem
} else if !n.isObserved() {
// requeue if item hasn't been observed via an informer event yet.
// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
// see https://issue.k8s.io/56121
klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
gc.attemptToDelete.AddRateLimited(item)
return requeueItem
}
return true
return forgetItem
}
// isDangling check if a reference is pointing to an object that doesn't exist.
@ -639,16 +660,16 @@ func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents [
}
func (gc *GarbageCollector) runAttemptToOrphanWorker() {
for gc.attemptToOrphanWorker() {
for gc.processAttemptToOrphanWorker() {
}
}
// attemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its
// processAttemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its
// dependents based on the graph maintained by the GC, then removes it from the
// OwnerReferences of its dependents, and finally updates the owner to remove
// the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of
// these steps fail.
func (gc *GarbageCollector) attemptToOrphanWorker() bool {
func (gc *GarbageCollector) processAttemptToOrphanWorker() bool {
item, quit := gc.attemptToOrphan.Get()
gc.workerLock.RLock()
defer gc.workerLock.RUnlock()
@ -656,10 +677,23 @@ func (gc *GarbageCollector) attemptToOrphanWorker() bool {
return false
}
defer gc.attemptToOrphan.Done(item)
action := gc.attemptToOrphanWorker(item)
switch action {
case forgetItem:
gc.attemptToOrphan.Forget(item)
case requeueItem:
gc.attemptToOrphan.AddRateLimited(item)
}
return true
}
func (gc *GarbageCollector) attemptToOrphanWorker(item interface{}) workQueueItemAction {
owner, ok := item.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
return true
return forgetItem
}
// we don't need to lock each element, because they never get updated
owner.dependentsLock.RLock()
@ -672,16 +706,15 @@ func (gc *GarbageCollector) attemptToOrphanWorker() bool {
err := gc.orphanDependents(owner.identity, dependents)
if err != nil {
utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
gc.attemptToOrphan.AddRateLimited(item)
return true
return requeueItem
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
if err != nil {
utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
gc.attemptToOrphan.AddRateLimited(item)
return requeueItem
}
return true
return forgetItem
}
// *FOR TEST USE ONLY*

View File

@ -460,7 +460,7 @@ func TestDependentsRace(t *testing.T) {
defer wg.Done()
for i := 0; i < updates; i++ {
gc.attemptToOrphan.Add(owner)
gc.attemptToOrphanWorker()
gc.processAttemptToOrphanWorker()
}
}()
wg.Wait()
@ -2445,7 +2445,7 @@ func processAttemptToDelete(count int) step {
if count <= 0 {
// process all
for ctx.gc.dependencyGraphBuilder.attemptToDelete.Len() != 0 {
ctx.gc.attemptToDeleteWorker(context.TODO())
ctx.gc.processAttemptToDeleteWorker(context.TODO())
}
} else {
for i := 0; i < count; i++ {
@ -2453,7 +2453,7 @@ func processAttemptToDelete(count int) step {
ctx.t.Errorf("expected at least %d pending changes, got %d", count, i+1)
return
}
ctx.gc.attemptToDeleteWorker(context.TODO())
ctx.gc.processAttemptToDeleteWorker(context.TODO())
}
}
},