diff --git a/pkg/controller/garbagecollector/garbagecollector.go b/pkg/controller/garbagecollector/garbagecollector.go index f132b26540b..fe05899fb20 100644 --- a/pkg/controller/garbagecollector/garbagecollector.go +++ b/pkg/controller/garbagecollector/garbagecollector.go @@ -157,7 +157,7 @@ func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerRefer } glog.V(6).Infof("add virtual node.identity: %s\n\n", ownerNode.identity) p.uidToNode.Write(ownerNode) - p.gc.dirtyQueue.Add(ownerNode) + p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: ownerNode}) } ownerNode.addDependent(n) } @@ -214,7 +214,7 @@ func referencesDiffs(old []metatypes.OwnerReference, new []metatypes.OwnerRefere return added, removed } -func shouldOrphanDependents(e event, accessor meta.Object) bool { +func shouldOrphanDependents(e *event, accessor meta.Object) bool { // The delta_fifo may combine the creation and update of the object into one // event, so we need to check AddEvent as well. if e.oldObj == nil { @@ -311,14 +311,14 @@ func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error { // the "Orphan" finalizer. The node is add back into the orphanQueue if any of // these steps fail. func (gc *GarbageCollector) orphanFinalizer() { - key, start, quit := gc.orphanQueue.Get() + timedItem, quit := gc.orphanQueue.Get() if quit { return } - defer gc.orphanQueue.Done(key) - owner, ok := key.(*node) + defer gc.orphanQueue.Done(timedItem) + owner, ok := timedItem.Object.(*node) if !ok { - utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", key)) + utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", timedItem.Object)) } // we don't need to lock each element, because they never get updated owner.dependentsLock.RLock() @@ -331,28 +331,28 @@ func (gc *GarbageCollector) orphanFinalizer() { err := gc.orhpanDependents(owner.identity, dependents) if err != nil { glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err) - gc.orphanQueue.AddWithTimestamp(owner, start) + gc.orphanQueue.Add(timedItem) return } // update the owner, remove "orphaningFinalizer" from its finalizers list err = gc.removeOrphanFinalizer(owner) if err != nil { glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err) - gc.orphanQueue.AddWithTimestamp(owner, start) + gc.orphanQueue.Add(timedItem) } - OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start)) + OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime)) } // Dequeueing an event from eventQueue, updating graph, populating dirty_queue. func (p *Propagator) processEvent() { - key, start, quit := p.eventQueue.Get() + timedItem, quit := p.eventQueue.Get() if quit { return } - defer p.eventQueue.Done(key) - event, ok := key.(event) + defer p.eventQueue.Done(timedItem) + event, ok := timedItem.Object.(*event) if !ok { - utilruntime.HandleError(fmt.Errorf("expect an event, got %v", key)) + utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", timedItem.Object)) return } obj := event.obj @@ -388,14 +388,14 @@ func (p *Propagator) processEvent() { // the underlying delta_fifo may combine a creation and deletion into one event if shouldOrphanDependents(event, accessor) { glog.V(6).Infof("add %s to the orphanQueue", newNode.identity) - p.gc.orphanQueue.Add(newNode) + p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode}) } case (event.eventType == addEvent || event.eventType == updateEvent) && found: // caveat: if GC observes the creation of the dependents later than the // deletion of the owner, then the orphaning finalizer won't be effective. if shouldOrphanDependents(event, accessor) { glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity) - p.gc.orphanQueue.Add(existingNode) + p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode}) } // add/remove owner refs added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences()) @@ -419,10 +419,10 @@ func (p *Propagator) processEvent() { existingNode.dependentsLock.RLock() defer existingNode.dependentsLock.RUnlock() for dep := range existingNode.dependents { - p.gc.dirtyQueue.Add(dep) + p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep}) } } - EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, start)) + EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime)) } // GarbageCollector is responsible for carrying out cascading deletion, and @@ -494,17 +494,17 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource // add the event to the propagator's eventQueue. AddFunc: func(obj interface{}) { setObjectTypeMeta(obj) - event := event{ + event := &event{ eventType: addEvent, obj: obj, } - gc.propagator.eventQueue.Add(event) + gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) }, UpdateFunc: func(oldObj, newObj interface{}) { setObjectTypeMeta(newObj) setObjectTypeMeta(oldObj) - event := event{updateEvent, newObj, oldObj} - gc.propagator.eventQueue.Add(event) + event := &event{updateEvent, newObj, oldObj} + gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) }, DeleteFunc: func(obj interface{}) { // delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it @@ -512,11 +512,11 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource obj = deletedFinalStateUnknown.Obj } setObjectTypeMeta(obj) - event := event{ + event := &event{ eventType: deleteEvent, obj: obj, } - gc.propagator.eventQueue.Add(event) + gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event}) }, }, ) @@ -533,20 +533,19 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{ } func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) { - clock := clock.RealClock{} gc := &GarbageCollector{ metaOnlyClientPool: metaOnlyClientPool, clientPool: clientPool, // TODO: should use a dynamic RESTMapper built from the discovery results. restMapper: registered.RESTMapper(), - clock: clock, - dirtyQueue: workqueue.NewTimedWorkQueue(clock), - orphanQueue: workqueue.NewTimedWorkQueue(clock), + clock: clock.RealClock{}, + dirtyQueue: workqueue.NewTimedWorkQueue(), + orphanQueue: workqueue.NewTimedWorkQueue(), registeredRateLimiter: NewRegisteredRateLimiter(), registeredRateLimiterForMonitors: NewRegisteredRateLimiter(), } gc.propagator = &Propagator{ - eventQueue: workqueue.NewTimedWorkQueue(gc.clock), + eventQueue: workqueue.NewTimedWorkQueue(), uidToNode: &concurrentUIDToNode{ RWMutex: &sync.RWMutex{}, uidToNode: make(map[types.UID]*node), @@ -572,16 +571,16 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam } func (gc *GarbageCollector) worker() { - key, start, quit := gc.dirtyQueue.Get() + timedItem, quit := gc.dirtyQueue.Get() if quit { return } - defer gc.dirtyQueue.Done(key) - err := gc.processItem(key.(*node)) + defer gc.dirtyQueue.Done(timedItem) + err := gc.processItem(timedItem.Object.(*node)) if err != nil { - utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err)) + utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", timedItem.Object, err)) } - DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start)) + DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime)) } // apiResource consults the REST mapper to translate an