workqueue: fix leak in queue preventing objects from being GCed

See https://github.com/grpc/grpc-go/issues/4758 for a real world example
of this leaking 2gb+ of data.

Basically, when we do `q.queue[1:]` we are just repositioning the slice.
The underlying array is still active, which contains the object formerly
known as `q.queue[0]`. Because its referencing this object, it will not
be GCed. The only thing that will trigger it to free is eventually when
we add enough to the queue that we allocate a whole new array.

Instead, we should explicitly clear out the old space when we remove it
from the queue. This ensures the object can be GCed, assuming the users'
application doesn't reference it anymore.

Kubernetes-commit: fe4e0cb3f518144b8892414992350783f2f20c07
This commit is contained in:
John Howard
2021-09-14 15:51:09 -07:00
committed by Kubernetes Publisher
parent 7f47bbe3fe
commit 973eb806bc

View File

@@ -17,10 +17,13 @@ limitations under the License.
package workqueue_test
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
@@ -159,3 +162,40 @@ func TestReinsert(t *testing.T) {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
// TestGarbageCollection ensures that objects that are added then removed from the queue are
// able to be garbage collected.
func TestGarbageCollection(t *testing.T) {
type bigObject struct {
data []byte
}
leakQueue := workqueue.New()
t.Cleanup(func() {
// Make sure leakQueue doesn't go out of scope too early
runtime.KeepAlive(leakQueue)
})
c := &bigObject{data: []byte("hello")}
mustGarbageCollect(t, c)
leakQueue.Add(c)
o, _ := leakQueue.Get()
leakQueue.Done(o)
}
// mustGarbageCollect asserts than an object was garbage collected by the end of the test.
// The input must be a pointer to an object.
func mustGarbageCollect(t *testing.T, i interface{}) {
t.Helper()
var collected int32 = 0
runtime.SetFinalizer(i, func(x interface{}) {
atomic.StoreInt32(&collected, 1)
})
t.Cleanup(func() {
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
// Trigger GC explicitly, otherwise we may need to wait a long time for it to run
runtime.GC()
return atomic.LoadInt32(&collected) == 1, nil
}); err != nil {
t.Errorf("object was not garbage collected")
}
})
}