From eb364c77d86ddea2ca375a6b129368081b2516a2 Mon Sep 17 00:00:00 2001 From: John Howard Date: Tue, 14 Sep 2021 15:51:09 -0700 Subject: [PATCH] workqueue: fix leak in queue preventing objects from being GCed See https://github.com/grpc/grpc-go/issues/4758 for a real world example of this leaking 2gb+ of data. Basically, when we do `q.queue[1:]` we are just repositioning the slice. The underlying array is still active, which contains the object formerly known as `q.queue[0]`. Because its referencing this object, it will not be GCed. The only thing that will trigger it to free is eventually when we add enough to the queue that we allocate a whole new array. Instead, we should explicitly clear out the old space when we remove it from the queue. This ensures the object can be GCed, assuming the users' application doesn't reference it anymore. Kubernetes-commit: 2a34801168dc1c70ba25b1d4200b534bf515cbc2 --- util/workqueue/queue_test.go | 40 ++++++++++++++++++++++++++++++++++++ 1 file changed, 40 insertions(+) diff --git a/util/workqueue/queue_test.go b/util/workqueue/queue_test.go index 2514746e..de782035 100644 --- a/util/workqueue/queue_test.go +++ b/util/workqueue/queue_test.go @@ -17,10 +17,13 @@ limitations under the License. package workqueue_test import ( + "runtime" "sync" + "sync/atomic" "testing" "time" + "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/util/workqueue" ) @@ -321,3 +324,40 @@ func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) { finishedWG.Wait() } + +// TestGarbageCollection ensures that objects that are added then removed from the queue are +// able to be garbage collected. +func TestGarbageCollection(t *testing.T) { + type bigObject struct { + data []byte + } + leakQueue := workqueue.New() + t.Cleanup(func() { + // Make sure leakQueue doesn't go out of scope too early + runtime.KeepAlive(leakQueue) + }) + c := &bigObject{data: []byte("hello")} + mustGarbageCollect(t, c) + leakQueue.Add(c) + o, _ := leakQueue.Get() + leakQueue.Done(o) +} + +// mustGarbageCollect asserts than an object was garbage collected by the end of the test. +// The input must be a pointer to an object. +func mustGarbageCollect(t *testing.T, i interface{}) { + t.Helper() + var collected int32 = 0 + runtime.SetFinalizer(i, func(x interface{}) { + atomic.StoreInt32(&collected, 1) + }) + t.Cleanup(func() { + if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) { + // Trigger GC explicitly, otherwise we may need to wait a long time for it to run + runtime.GC() + return atomic.LoadInt32(&collected) == 1, nil + }); err != nil { + t.Errorf("object was not garbage collected") + } + }) +}