mirror of
https://github.com/kubernetes/client-go.git
synced 2025-06-25 14:41:53 +00:00
workqueue: fix leak in queue preventing objects from being GCed
See https://github.com/grpc/grpc-go/issues/4758 for a real world example of this leaking 2gb+ of data. Basically, when we do `q.queue[1:]` we are just repositioning the slice. The underlying array is still active, which contains the object formerly known as `q.queue[0]`. Because its referencing this object, it will not be GCed. The only thing that will trigger it to free is eventually when we add enough to the queue that we allocate a whole new array. Instead, we should explicitly clear out the old space when we remove it from the queue. This ensures the object can be GCed, assuming the users' application doesn't reference it anymore. Kubernetes-commit: 2a34801168dc1c70ba25b1d4200b534bf515cbc2
This commit is contained in:
parent
98470c83bc
commit
eb364c77d8
@ -17,10 +17,13 @@ limitations under the License.
|
||||
package workqueue_test
|
||||
|
||||
import (
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
)
|
||||
|
||||
@ -321,3 +324,40 @@ func TestQueueDrainageUsingShutDownWithDrainWithDirtyItem(t *testing.T) {
|
||||
|
||||
finishedWG.Wait()
|
||||
}
|
||||
|
||||
// TestGarbageCollection ensures that objects that are added then removed from the queue are
|
||||
// able to be garbage collected.
|
||||
func TestGarbageCollection(t *testing.T) {
|
||||
type bigObject struct {
|
||||
data []byte
|
||||
}
|
||||
leakQueue := workqueue.New()
|
||||
t.Cleanup(func() {
|
||||
// Make sure leakQueue doesn't go out of scope too early
|
||||
runtime.KeepAlive(leakQueue)
|
||||
})
|
||||
c := &bigObject{data: []byte("hello")}
|
||||
mustGarbageCollect(t, c)
|
||||
leakQueue.Add(c)
|
||||
o, _ := leakQueue.Get()
|
||||
leakQueue.Done(o)
|
||||
}
|
||||
|
||||
// mustGarbageCollect asserts than an object was garbage collected by the end of the test.
|
||||
// The input must be a pointer to an object.
|
||||
func mustGarbageCollect(t *testing.T, i interface{}) {
|
||||
t.Helper()
|
||||
var collected int32 = 0
|
||||
runtime.SetFinalizer(i, func(x interface{}) {
|
||||
atomic.StoreInt32(&collected, 1)
|
||||
})
|
||||
t.Cleanup(func() {
|
||||
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
|
||||
// Trigger GC explicitly, otherwise we may need to wait a long time for it to run
|
||||
runtime.GC()
|
||||
return atomic.LoadInt32(&collected) == 1, nil
|
||||
}); err != nil {
|
||||
t.Errorf("object was not garbage collected")
|
||||
}
|
||||
})
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user