mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-26 05:03:09 +00:00
Merge pull request #106029 from astraw99/fix-workqueue-forget
Add GC workqueue `Forget` to stop the rate limiter
This commit is contained in:
commit
cc16e7792f
@ -288,7 +288,7 @@ func (gc *GarbageCollector) IsSynced() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) {
|
func (gc *GarbageCollector) runAttemptToDeleteWorker(ctx context.Context) {
|
||||||
for gc.attemptToDeleteWorker(ctx) {
|
for gc.processAttemptToDeleteWorker(ctx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -296,7 +296,7 @@ var enqueuedVirtualDeleteEventErr = goerrors.New("enqueued virtual delete event"
|
|||||||
|
|
||||||
var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objects cannot refer to namespaced owners")
|
var namespacedOwnerOfClusterScopedObjectErr = goerrors.New("cluster-scoped objects cannot refer to namespaced owners")
|
||||||
|
|
||||||
func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
|
func (gc *GarbageCollector) processAttemptToDeleteWorker(ctx context.Context) bool {
|
||||||
item, quit := gc.attemptToDelete.Get()
|
item, quit := gc.attemptToDelete.Get()
|
||||||
gc.workerLock.RLock()
|
gc.workerLock.RLock()
|
||||||
defer gc.workerLock.RUnlock()
|
defer gc.workerLock.RUnlock()
|
||||||
@ -304,10 +304,30 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
defer gc.attemptToDelete.Done(item)
|
defer gc.attemptToDelete.Done(item)
|
||||||
|
|
||||||
|
action := gc.attemptToDeleteWorker(ctx, item)
|
||||||
|
switch action {
|
||||||
|
case forgetItem:
|
||||||
|
gc.attemptToDelete.Forget(item)
|
||||||
|
case requeueItem:
|
||||||
|
gc.attemptToDelete.AddRateLimited(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
type workQueueItemAction int
|
||||||
|
|
||||||
|
const (
|
||||||
|
requeueItem = iota
|
||||||
|
forgetItem
|
||||||
|
)
|
||||||
|
|
||||||
|
func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context, item interface{}) workQueueItemAction {
|
||||||
n, ok := item.(*node)
|
n, ok := item.(*node)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
|
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
|
||||||
return true
|
return forgetItem
|
||||||
}
|
}
|
||||||
|
|
||||||
if !n.isObserved() {
|
if !n.isObserved() {
|
||||||
@ -316,23 +336,23 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
|
|||||||
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
|
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
|
||||||
// and in the meantime a deletion of the real object associated with that uid was observed
|
// and in the meantime a deletion of the real object associated with that uid was observed
|
||||||
klog.V(5).Infof("item %s no longer in the graph, skipping attemptToDeleteItem", n)
|
klog.V(5).Infof("item %s no longer in the graph, skipping attemptToDeleteItem", n)
|
||||||
return true
|
return forgetItem
|
||||||
}
|
}
|
||||||
if nodeFromGraph.isObserved() {
|
if nodeFromGraph.isObserved() {
|
||||||
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
|
// this can happen if attemptToDelete loops on a requeued virtual node because attemptToDeleteItem returned an error,
|
||||||
// and in the meantime the real object associated with that uid was observed
|
// and in the meantime the real object associated with that uid was observed
|
||||||
klog.V(5).Infof("item %s no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", n)
|
klog.V(5).Infof("item %s no longer virtual in the graph, skipping attemptToDeleteItem on virtual node", n)
|
||||||
return true
|
return forgetItem
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
err := gc.attemptToDeleteItem(ctx, n)
|
err := gc.attemptToDeleteItem(ctx, n)
|
||||||
if err == enqueuedVirtualDeleteEventErr {
|
if err == enqueuedVirtualDeleteEventErr {
|
||||||
// a virtual event was produced and will be handled by processGraphChanges, no need to requeue this node
|
// a virtual event was produced and will be handled by processGraphChanges, no need to requeue this node
|
||||||
return true
|
return forgetItem
|
||||||
} else if err == namespacedOwnerOfClusterScopedObjectErr {
|
} else if err == namespacedOwnerOfClusterScopedObjectErr {
|
||||||
// a cluster-scoped object referring to a namespaced owner is an error that will not resolve on retry, no need to requeue this node
|
// a cluster-scoped object referring to a namespaced owner is an error that will not resolve on retry, no need to requeue this node
|
||||||
return true
|
return forgetItem
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
if _, ok := err.(*restMappingError); ok {
|
if _, ok := err.(*restMappingError); ok {
|
||||||
// There are at least two ways this can happen:
|
// There are at least two ways this can happen:
|
||||||
@ -347,15 +367,16 @@ func (gc *GarbageCollector) attemptToDeleteWorker(ctx context.Context) bool {
|
|||||||
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
|
utilruntime.HandleError(fmt.Errorf("error syncing item %s: %v", n, err))
|
||||||
}
|
}
|
||||||
// retry if garbage collection of an object failed.
|
// retry if garbage collection of an object failed.
|
||||||
gc.attemptToDelete.AddRateLimited(item)
|
return requeueItem
|
||||||
} else if !n.isObserved() {
|
} else if !n.isObserved() {
|
||||||
// requeue if item hasn't been observed via an informer event yet.
|
// requeue if item hasn't been observed via an informer event yet.
|
||||||
// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
|
// otherwise a virtual node for an item added AND removed during watch reestablishment can get stuck in the graph and never removed.
|
||||||
// see https://issue.k8s.io/56121
|
// see https://issue.k8s.io/56121
|
||||||
klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
|
klog.V(5).Infof("item %s hasn't been observed via informer yet", n.identity)
|
||||||
gc.attemptToDelete.AddRateLimited(item)
|
return requeueItem
|
||||||
}
|
}
|
||||||
return true
|
|
||||||
|
return forgetItem
|
||||||
}
|
}
|
||||||
|
|
||||||
// isDangling check if a reference is pointing to an object that doesn't exist.
|
// isDangling check if a reference is pointing to an object that doesn't exist.
|
||||||
@ -639,16 +660,16 @@ func (gc *GarbageCollector) orphanDependents(owner objectReference, dependents [
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (gc *GarbageCollector) runAttemptToOrphanWorker() {
|
func (gc *GarbageCollector) runAttemptToOrphanWorker() {
|
||||||
for gc.attemptToOrphanWorker() {
|
for gc.processAttemptToOrphanWorker() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// attemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its
|
// processAttemptToOrphanWorker dequeues a node from the attemptToOrphan, then finds its
|
||||||
// dependents based on the graph maintained by the GC, then removes it from the
|
// dependents based on the graph maintained by the GC, then removes it from the
|
||||||
// OwnerReferences of its dependents, and finally updates the owner to remove
|
// OwnerReferences of its dependents, and finally updates the owner to remove
|
||||||
// the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of
|
// the "Orphan" finalizer. The node is added back into the attemptToOrphan if any of
|
||||||
// these steps fail.
|
// these steps fail.
|
||||||
func (gc *GarbageCollector) attemptToOrphanWorker() bool {
|
func (gc *GarbageCollector) processAttemptToOrphanWorker() bool {
|
||||||
item, quit := gc.attemptToOrphan.Get()
|
item, quit := gc.attemptToOrphan.Get()
|
||||||
gc.workerLock.RLock()
|
gc.workerLock.RLock()
|
||||||
defer gc.workerLock.RUnlock()
|
defer gc.workerLock.RUnlock()
|
||||||
@ -656,10 +677,23 @@ func (gc *GarbageCollector) attemptToOrphanWorker() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
defer gc.attemptToOrphan.Done(item)
|
defer gc.attemptToOrphan.Done(item)
|
||||||
|
|
||||||
|
action := gc.attemptToOrphanWorker(item)
|
||||||
|
switch action {
|
||||||
|
case forgetItem:
|
||||||
|
gc.attemptToOrphan.Forget(item)
|
||||||
|
case requeueItem:
|
||||||
|
gc.attemptToOrphan.AddRateLimited(item)
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (gc *GarbageCollector) attemptToOrphanWorker(item interface{}) workQueueItemAction {
|
||||||
owner, ok := item.(*node)
|
owner, ok := item.(*node)
|
||||||
if !ok {
|
if !ok {
|
||||||
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
|
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", item))
|
||||||
return true
|
return forgetItem
|
||||||
}
|
}
|
||||||
// we don't need to lock each element, because they never get updated
|
// we don't need to lock each element, because they never get updated
|
||||||
owner.dependentsLock.RLock()
|
owner.dependentsLock.RLock()
|
||||||
@ -672,16 +706,15 @@ func (gc *GarbageCollector) attemptToOrphanWorker() bool {
|
|||||||
err := gc.orphanDependents(owner.identity, dependents)
|
err := gc.orphanDependents(owner.identity, dependents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
|
utilruntime.HandleError(fmt.Errorf("orphanDependents for %s failed with %v", owner.identity, err))
|
||||||
gc.attemptToOrphan.AddRateLimited(item)
|
return requeueItem
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
// update the owner, remove "orphaningFinalizer" from its finalizers list
|
// update the owner, remove "orphaningFinalizer" from its finalizers list
|
||||||
err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
|
err = gc.removeFinalizer(owner, metav1.FinalizerOrphanDependents)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
|
utilruntime.HandleError(fmt.Errorf("removeOrphanFinalizer for %s failed with %v", owner.identity, err))
|
||||||
gc.attemptToOrphan.AddRateLimited(item)
|
return requeueItem
|
||||||
}
|
}
|
||||||
return true
|
return forgetItem
|
||||||
}
|
}
|
||||||
|
|
||||||
// *FOR TEST USE ONLY*
|
// *FOR TEST USE ONLY*
|
||||||
|
@ -460,7 +460,7 @@ func TestDependentsRace(t *testing.T) {
|
|||||||
defer wg.Done()
|
defer wg.Done()
|
||||||
for i := 0; i < updates; i++ {
|
for i := 0; i < updates; i++ {
|
||||||
gc.attemptToOrphan.Add(owner)
|
gc.attemptToOrphan.Add(owner)
|
||||||
gc.attemptToOrphanWorker()
|
gc.processAttemptToOrphanWorker()
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
wg.Wait()
|
wg.Wait()
|
||||||
@ -2445,7 +2445,7 @@ func processAttemptToDelete(count int) step {
|
|||||||
if count <= 0 {
|
if count <= 0 {
|
||||||
// process all
|
// process all
|
||||||
for ctx.gc.dependencyGraphBuilder.attemptToDelete.Len() != 0 {
|
for ctx.gc.dependencyGraphBuilder.attemptToDelete.Len() != 0 {
|
||||||
ctx.gc.attemptToDeleteWorker(context.TODO())
|
ctx.gc.processAttemptToDeleteWorker(context.TODO())
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
for i := 0; i < count; i++ {
|
for i := 0; i < count; i++ {
|
||||||
@ -2453,7 +2453,7 @@ func processAttemptToDelete(count int) step {
|
|||||||
ctx.t.Errorf("expected at least %d pending changes, got %d", count, i+1)
|
ctx.t.Errorf("expected at least %d pending changes, got %d", count, i+1)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
ctx.gc.attemptToDeleteWorker(context.TODO())
|
ctx.gc.processAttemptToDeleteWorker(context.TODO())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
|
Loading…
Reference in New Issue
Block a user