Merge pull request #106583 from hzxuzhonghu/automated-cherry-pick-of-#104991-#105031-origin-release-1.21

Automated cherry pick of #104991: Fix workqueue memory leak
#105031: workqueue: fix leak in queue preventing objects from being

Kubernetes-commit: 9d7f9092da28caba0e30274aa17089edeeda8e96
This commit is contained in:
Kubernetes Publisher
2022-01-14 19:46:22 -08:00
2 changed files with 44 additions and 1 deletions

View File

@@ -155,7 +155,10 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
q.metrics.get(item)

View File

@@ -17,10 +17,13 @@ limitations under the License.
package workqueue_test
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
@@ -159,3 +162,40 @@ func TestReinsert(t *testing.T) {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
// TestGarbageCollection ensures that objects that are added then removed from the queue are
// able to be garbage collected.
func TestGarbageCollection(t *testing.T) {
type bigObject struct {
data []byte
}
leakQueue := workqueue.New()
t.Cleanup(func() {
// Make sure leakQueue doesn't go out of scope too early
runtime.KeepAlive(leakQueue)
})
c := &bigObject{data: []byte("hello")}
mustGarbageCollect(t, c)
leakQueue.Add(c)
o, _ := leakQueue.Get()
leakQueue.Done(o)
}
// mustGarbageCollect asserts than an object was garbage collected by the end of the test.
// The input must be a pointer to an object.
func mustGarbageCollect(t *testing.T, i interface{}) {
t.Helper()
var collected int32 = 0
runtime.SetFinalizer(i, func(x interface{}) {
atomic.StoreInt32(&collected, 1)
})
t.Cleanup(func() {
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
// Trigger GC explicitly, otherwise we may need to wait a long time for it to run
runtime.GC()
return atomic.LoadInt32(&collected) == 1, nil
}); err != nil {
t.Errorf("object was not garbage collected")
}
})
}