Merge pull request #30943 from caesarxuchao/fix-gc-memory-leak

Automatic merge from submit-queue

Fix memory leak in gc

ref #30759

GC had a memory leak. The work queue item is never deleted.

I'm still fighting with my kubemark cluster to get statistics after this fix.

@wojtek-t @lavalamp
This commit is contained in:
Kubernetes Submit Queue 2016-08-19 02:08:56 -07:00 committed by GitHub
commit 30b180e4a5
4 changed files with 97 additions and 70 deletions

View File

@ -157,7 +157,7 @@ func (p *Propagator) addDependentToOwners(n *node, owners []metatypes.OwnerRefer
}
glog.V(6).Infof("add virtual node.identity: %s\n\n", ownerNode.identity)
p.uidToNode.Write(ownerNode)
p.gc.dirtyQueue.Add(ownerNode)
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: ownerNode})
}
ownerNode.addDependent(n)
}
@ -214,7 +214,7 @@ func referencesDiffs(old []metatypes.OwnerReference, new []metatypes.OwnerRefere
return added, removed
}
func shouldOrphanDependents(e event, accessor meta.Object) bool {
func shouldOrphanDependents(e *event, accessor meta.Object) bool {
// The delta_fifo may combine the creation and update of the object into one
// event, so we need to check AddEvent as well.
if e.oldObj == nil {
@ -311,14 +311,14 @@ func (gc *GarbageCollector) removeOrphanFinalizer(owner *node) error {
// the "Orphan" finalizer. The node is add back into the orphanQueue if any of
// these steps fail.
func (gc *GarbageCollector) orphanFinalizer() {
key, start, quit := gc.orphanQueue.Get()
timedItem, quit := gc.orphanQueue.Get()
if quit {
return
}
defer gc.orphanQueue.Done(key)
owner, ok := key.(*node)
defer gc.orphanQueue.Done(timedItem)
owner, ok := timedItem.Object.(*node)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", key))
utilruntime.HandleError(fmt.Errorf("expect *node, got %#v", timedItem.Object))
}
// we don't need to lock each element, because they never get updated
owner.dependentsLock.RLock()
@ -331,28 +331,28 @@ func (gc *GarbageCollector) orphanFinalizer() {
err := gc.orhpanDependents(owner.identity, dependents)
if err != nil {
glog.V(6).Infof("orphanDependents for %s failed with %v", owner.identity, err)
gc.orphanQueue.AddWithTimestamp(owner, start)
gc.orphanQueue.Add(timedItem)
return
}
// update the owner, remove "orphaningFinalizer" from its finalizers list
err = gc.removeOrphanFinalizer(owner)
if err != nil {
glog.V(6).Infof("removeOrphanFinalizer for %s failed with %v", owner.identity, err)
gc.orphanQueue.AddWithTimestamp(owner, start)
gc.orphanQueue.Add(timedItem)
}
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
OrphanProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
// Dequeueing an event from eventQueue, updating graph, populating dirty_queue.
func (p *Propagator) processEvent() {
key, start, quit := p.eventQueue.Get()
timedItem, quit := p.eventQueue.Get()
if quit {
return
}
defer p.eventQueue.Done(key)
event, ok := key.(event)
defer p.eventQueue.Done(timedItem)
event, ok := timedItem.Object.(*event)
if !ok {
utilruntime.HandleError(fmt.Errorf("expect an event, got %v", key))
utilruntime.HandleError(fmt.Errorf("expect a *event, got %v", timedItem.Object))
return
}
obj := event.obj
@ -388,14 +388,14 @@ func (p *Propagator) processEvent() {
// the underlying delta_fifo may combine a creation and deletion into one event
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", newNode.identity)
p.gc.orphanQueue.Add(newNode)
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: newNode})
}
case (event.eventType == addEvent || event.eventType == updateEvent) && found:
// caveat: if GC observes the creation of the dependents later than the
// deletion of the owner, then the orphaning finalizer won't be effective.
if shouldOrphanDependents(event, accessor) {
glog.V(6).Infof("add %s to the orphanQueue", existingNode.identity)
p.gc.orphanQueue.Add(existingNode)
p.gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: existingNode})
}
// add/remove owner refs
added, removed := referencesDiffs(existingNode.owners, accessor.GetOwnerReferences())
@ -419,10 +419,10 @@ func (p *Propagator) processEvent() {
existingNode.dependentsLock.RLock()
defer existingNode.dependentsLock.RUnlock()
for dep := range existingNode.dependents {
p.gc.dirtyQueue.Add(dep)
p.gc.dirtyQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: p.gc.clock.Now(), Object: dep})
}
}
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, start))
EventProcessingLatency.Observe(sinceInMicroseconds(p.gc.clock, timedItem.StartTime))
}
// GarbageCollector is responsible for carrying out cascading deletion, and
@ -494,17 +494,17 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource
// add the event to the propagator's eventQueue.
AddFunc: func(obj interface{}) {
setObjectTypeMeta(obj)
event := event{
event := &event{
eventType: addEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
UpdateFunc: func(oldObj, newObj interface{}) {
setObjectTypeMeta(newObj)
setObjectTypeMeta(oldObj)
event := event{updateEvent, newObj, oldObj}
gc.propagator.eventQueue.Add(event)
event := &event{updateEvent, newObj, oldObj}
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
DeleteFunc: func(obj interface{}) {
// delta fifo may wrap the object in a cache.DeletedFinalStateUnknown, unwrap it
@ -512,11 +512,11 @@ func (gc *GarbageCollector) monitorFor(resource unversioned.GroupVersionResource
obj = deletedFinalStateUnknown.Obj
}
setObjectTypeMeta(obj)
event := event{
event := &event{
eventType: deleteEvent,
obj: obj,
}
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
},
},
)
@ -533,20 +533,19 @@ var ignoredResources = map[unversioned.GroupVersionResource]struct{}{
}
func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynamic.ClientPool, resources []unversioned.GroupVersionResource) (*GarbageCollector, error) {
clock := clock.RealClock{}
gc := &GarbageCollector{
metaOnlyClientPool: metaOnlyClientPool,
clientPool: clientPool,
// TODO: should use a dynamic RESTMapper built from the discovery results.
restMapper: registered.RESTMapper(),
clock: clock,
dirtyQueue: workqueue.NewTimedWorkQueue(clock),
orphanQueue: workqueue.NewTimedWorkQueue(clock),
clock: clock.RealClock{},
dirtyQueue: workqueue.NewTimedWorkQueue(),
orphanQueue: workqueue.NewTimedWorkQueue(),
registeredRateLimiter: NewRegisteredRateLimiter(),
registeredRateLimiterForMonitors: NewRegisteredRateLimiter(),
}
gc.propagator = &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(gc.clock),
eventQueue: workqueue.NewTimedWorkQueue(),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
@ -572,16 +571,16 @@ func NewGarbageCollector(metaOnlyClientPool dynamic.ClientPool, clientPool dynam
}
func (gc *GarbageCollector) worker() {
key, start, quit := gc.dirtyQueue.Get()
timedItem, quit := gc.dirtyQueue.Get()
if quit {
return
}
defer gc.dirtyQueue.Done(key)
err := gc.processItem(key.(*node))
defer gc.dirtyQueue.Done(timedItem)
err := gc.processItem(timedItem.Object.(*node))
if err != nil {
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", key, err))
utilruntime.HandleError(fmt.Errorf("Error syncing item %#v: %v", timedItem.Object, err))
}
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, start))
DirtyProcessingLatency.Observe(sinceInMicroseconds(gc.clock, timedItem.StartTime))
}
// apiResource consults the REST mapper to translate an <apiVersion, kind,
@ -681,24 +680,24 @@ func (gc *GarbageCollector) processItem(item *node) error {
// exist yet, so we need to enqueue a virtual Delete event to remove
// the virtual node from Propagator.uidToNode.
glog.V(6).Infof("item %v not found, generating a virtual delete event", item.identity)
event := event{
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
return err
}
if latest.GetUID() != item.identity.UID {
glog.V(6).Infof("UID doesn't match, item %v not found, generating a virtual delete event", item.identity)
event := event{
event := &event{
eventType: deleteEvent,
obj: objectReferenceToMetadataOnlyObject(item.identity),
}
glog.V(6).Infof("generating virtual delete event for %s\n\n", event.obj)
gc.propagator.eventQueue.Add(event)
gc.propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: event})
return nil
}
ownerReferences := latest.GetOwnerReferences()

View File

@ -283,18 +283,18 @@ func TestProcessEvent(t *testing.T) {
for _, scenario := range testScenarios {
propagator := &Propagator{
eventQueue: workqueue.NewTimedWorkQueue(clock.RealClock{}),
eventQueue: workqueue.NewTimedWorkQueue(),
uidToNode: &concurrentUIDToNode{
RWMutex: &sync.RWMutex{},
uidToNode: make(map[types.UID]*node),
},
gc: &GarbageCollector{
dirtyQueue: workqueue.NewTimedWorkQueue(clock.RealClock{}),
dirtyQueue: workqueue.NewTimedWorkQueue(),
clock: clock.RealClock{},
},
}
for i := 0; i < len(scenario.events); i++ {
propagator.eventQueue.Add(scenario.events[i])
propagator.eventQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: propagator.gc.clock.Now(), Object: &scenario.events[i]})
propagator.processEvent()
verifyGraphInvariants(scenario.name, propagator.uidToNode.uidToNode, t)
}
@ -327,7 +327,7 @@ func TestDependentsRace(t *testing.T) {
}
}()
go func() {
gc.orphanQueue.Add(owner)
gc.orphanQueue.Add(&workqueue.TimedWorkQueueItem{StartTime: gc.clock.Now(), Object: owner})
for i := 0; i < updates; i++ {
gc.orphanFinalizer()
}

View File

@ -16,47 +16,37 @@ limitations under the License.
package workqueue
import (
"time"
"k8s.io/kubernetes/pkg/util/clock"
)
import "time"
type TimedWorkQueue struct {
*Type
clock clock.Clock
}
type timedWorkQueueItem struct {
time time.Time
obj interface{}
type TimedWorkQueueItem struct {
StartTime time.Time
Object interface{}
}
func NewTimedWorkQueue(clock clock.Clock) *TimedWorkQueue {
return &TimedWorkQueue{New(), clock}
func NewTimedWorkQueue() *TimedWorkQueue {
return &TimedWorkQueue{New()}
}
// Add adds the obj along with the current timestamp to the queue.
func (q TimedWorkQueue) Add(obj interface{}) {
start := q.clock.Now()
item := timedWorkQueueItem{start, obj}
q.Type.Add(item)
}
// AddWithTimestamp is useful if the caller does not want to refresh the start
// time when requeuing an item.
func (q TimedWorkQueue) AddWithTimestamp(obj interface{}, timestamp time.Time) {
item := timedWorkQueueItem{timestamp, obj}
q.Type.Add(item)
func (q TimedWorkQueue) Add(timedItem *TimedWorkQueueItem) {
q.Type.Add(timedItem)
}
// Get gets the obj along with its timestamp from the queue.
func (q TimedWorkQueue) Get() (item interface{}, start time.Time, shutdown bool) {
item, shutdown = q.Type.Get()
if item != nil {
timed, _ := item.(timedWorkQueueItem)
item = timed.obj
start = timed.time
func (q TimedWorkQueue) Get() (timedItem *TimedWorkQueueItem, shutdown bool) {
origin, shutdown := q.Type.Get()
if origin == nil {
return nil, shutdown
}
return item, start, shutdown
timedItem, _ = origin.(*TimedWorkQueueItem)
return timedItem, shutdown
}
func (q TimedWorkQueue) Done(timedItem *TimedWorkQueueItem) error {
q.Type.Done(timedItem)
return nil
}

View File

@ -0,0 +1,38 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package workqueue
import (
"testing"
"time"
"k8s.io/kubernetes/pkg/api/v1"
)
func TestNoMemoryLeak(t *testing.T) {
timedQueue := NewTimedWorkQueue()
timedQueue.Add(&TimedWorkQueueItem{Object: &v1.Pod{}, StartTime: time.Time{}})
item, _ := timedQueue.Get()
timedQueue.Add(item)
// The item should still be in the timedQueue.
timedQueue.Done(item)
item, _ = timedQueue.Get()
timedQueue.Done(item)
if len(timedQueue.Type.processing) != 0 {
t.Errorf("expect timedQueue.Type.processing to be empty!")
}
}