fix memory leak in gc

This commit is contained in:
Chao Xu
2016-08-18 15:22:42 -07:00
parent 2166ce2fdc
commit 65d1dbe8d9
4 changed files with 97 additions and 70 deletions

View File

@@ -157,7 +157,7 @@ func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerRefer
}
glog.V(6).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
p.uidToNode.Write(ownerNode)
p.gc.dirtyQueue.Add(ownerNode)
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: ownerNode})
}
ownerNode.addDependent(n)
}
@@ -214,7 +214,7 @@ func referencesDiffs(old []metatypes.OwnerReference, new []metatypes.OwnerRefere
return added, removed
}
func shouldOrphanDependents(e event, accessor meta.Object) bool {
func shouldOrphanDependents(e *event, accessor meta.Object) bool {
// The delta_fifo may combine the creation and update of the object into one
// event, so we need to check AddEvent as well.
if e.oldObj == nil {
@@ -311,14 +311,14 @@ func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error {
// the "Orphan" finalizer. The node is add back into the orphanQueue if any of
// these steps fail.
func (gc *GarbageCollector) orphanFinalizer() {
key, start, quit := gc.orphanQueue.Get()
timedItem, quit := gc.orphanQueue.Get()
if quit {
return
}
defer gc.orphanQueue.Done(key)
owner, ok := key.(*node)
defer gc.orphanQueue.Done(timedItem)
owner, ok := timedItem.Object.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", key))
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", timedItem.Object))
}
// we don't need to lock each element, because they never get updated
owner.dependentsLock.RLock()
@@ -331,28 +331,28 @@ func (gc *GarbageCollector) orphanFinalizer() {
err := gc.orhpanDependents(owner.identity, dependents)
if err != nil {
glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err)
gc.orphanQueue.AddWithTimestamp(owner, start)
gc.orphanQueue.Add(timedItem)
return
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeOrphanFinalizer(owner)
if err != nil {
glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err)
gc.orphanQueue.AddWithTimestamp(owner, start)
gc.orphanQueue.Add(timedItem)
}
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
// Dequeueing an event from eventQueue, updating graph, populating dirty_queue.
func (p *Propagator) processEvent() {
key, start, quit := p.eventQueue.Get()
timedItem, quit := p.eventQueue.Get()
if quit {
return
}
defer p.eventQueue.Done(key)
event, ok := key.(event)
defer p.eventQueue.Done(timedItem)
event, ok := timedItem.Object.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect an event, got %v", key))
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", timedItem.Object))
return
}
obj := event.obj
@@ -388,14 +388,14 @@ func (p *Propagator) processEvent() {
// the underlying delta_fifo may combine a creation and deletion into one event
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", newNode.identity)
p.gc.orphanQueue.Add(newNode)
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode})
}
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// caveat: if GC observes the creation of the dependents later than the
// deletion of the owner, then the orphaning finalizer won't be effective.
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity)
p.gc.orphanQueue.Add(existingNode)
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode})
}
// add/remove owner refs
added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
@@ -419,10 +419,10 @@ func (p *Propagator) processEvent() {
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
for dep := range existingNode.dependents {
p.gc.dirtyQueue.Add(dep)
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep})
}
}
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, start))
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime))
}
// GarbageCollector is responsible for carrying out cascading deletion, and
@@ -494,17 +494,17 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource
// add the event to the propagator's eventQueue.
AddFunc: func(obj interface{}) {
setObjectTypeMeta(obj)
event := event{
event := &event{
eventType: addEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj)
setObjectTypeMeta(oldObj)
event := event{updateEvent, newObj, oldObj}
gc.propagator.eventQueue.Add(event)
event := &event{updateEvent, newObj, oldObj}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
@@ -512,11 +512,11 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource
obj = deletedFinalStateUnknown.Obj
}
setObjectTypeMeta(obj)
event := event{
event := &event{
eventType: deleteEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
},
)
@@ -533,20 +533,19 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
}
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
clock := clock.RealClock{}
gc := &GarbageCollector{
metaOnlyClientPool: metaOnlyClientPool,
clientPool: clientPool,
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper: registered.RESTMapper(),
clock: clock,
dirtyQueue: workqueue.NewTimedWorkQueue(clock),
orphanQueue: workqueue.NewTimedWorkQueue(clock),
clock: clock.RealClock{},
dirtyQueue: workqueue.NewTimedWorkQueue(),
orphanQueue: workqueue.NewTimedWorkQueue(),
registeredRateLimiter: NewRegisteredRateLimiter(),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(),
}
gc.propagator = &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(gc.clock),
eventQueue: workqueue.NewTimedWorkQueue(),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
@@ -572,16 +571,16 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
}
func (gc *GarbageCollector) worker() {
key, start, quit := gc.dirtyQueue.Get()
timedItem, quit := gc.dirtyQueue.Get()
if quit {
return
}
defer gc.dirtyQueue.Done(key)
err := gc.processItem(key.(*node))
defer gc.dirtyQueue.Done(timedItem)
err := gc.processItem(timedItem.Object.(*node))
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err))
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", timedItem.Object, err))
}
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
// apiResource consults the REST mapper to translate an <apiVersion, kind,
@@ -681,24 +680,24 @@ func (gc *GarbageCollector) processItem(item *node) error {
// exist yet, so we need to enqueue a virtual Delete event to remove
// the virtual node from Propagator.uidToNode.
glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity)
event := event{
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
return err
}
if latest.GetUID() != item.identity.UID {
glog.V(6).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
event := event{
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
ownerReferences := latest.GetOwnerReferences()