Compare commits

..

4 Commits

Author SHA1 Message Date
Kubernetes Publisher
1b9f43a6c0 Update dependencies to v0.20.15 tag 2022-01-19 21:38:02 +00:00
Kubernetes Publisher
063236fc60 Merge pull request #106585 from hzxuzhonghu/automated-cherry-pick-of-#104991-#105031-origin-release-1.20
Automated cherry pick of #104991: Fix workqueue memory leak
#105031: workqueue: fix leak in queue preventing objects from being

Kubernetes-commit: 97bc2082ffc23c0c9656ff7d776ddfbf56510764
2022-01-14 20:02:16 -08:00
John Howard
a74b16578a workqueue: fix leak in queue preventing objects from being GCed
See https://github.com/grpc/grpc-go/issues/4758 for a real world example
of this leaking 2gb+ of data.

Basically, when we do `q.queue[1:]` we are just repositioning the slice.
The underlying array is still active, which contains the object formerly
known as `q.queue[0]`. Because its referencing this object, it will not
be GCed. The only thing that will trigger it to free is eventually when
we add enough to the queue that we allocate a whole new array.

Instead, we should explicitly clear out the old space when we remove it
from the queue. This ensures the object can be GCed, assuming the users'
application doesn't reference it anymore.

Kubernetes-commit: a94c0c21ced4fd9a7d9b736598f73c9903e336b6
2021-09-14 15:51:09 -07:00
xuzhonghu
f7fb3aa62b Fix workqueue memory leak
Kubernetes-commit: ce38e02bb58929a965476c478476d1d8957e20c0
2021-09-14 09:53:53 +08:00
4 changed files with 52 additions and 9 deletions

8
go.mod
View File

@@ -26,14 +26,14 @@ require (
golang.org/x/net v0.0.0-20201110031124-69a78807bb2b
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e
k8s.io/api v0.20.14
k8s.io/apimachinery v0.20.14
k8s.io/api v0.20.15
k8s.io/apimachinery v0.20.15
k8s.io/klog/v2 v2.4.0
k8s.io/utils v0.0.0-20201110183641-67b214c5f920
sigs.k8s.io/yaml v1.2.0
)
replace (
k8s.io/api => k8s.io/api v0.20.14
k8s.io/apimachinery => k8s.io/apimachinery v0.20.14
k8s.io/api => k8s.io/api v0.20.15
k8s.io/apimachinery => k8s.io/apimachinery v0.20.15
)

8
go.sum
View File

@@ -425,10 +425,10 @@ honnef.co/go/tools v0.0.0-20190418001031-e561f6794a2a/go.mod h1:rf3lG4BRIbNafJWh
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg=
honnef.co/go/tools v0.0.1-2020.1.3/go.mod h1:X/FiERA/W4tHapMX5mGpAtMSVEeEUOyHaw9vFzvIQ3k=
k8s.io/api v0.20.14 h1:vAI3AdDY0Ou9oAkOy/fQ3K0F+FOT+TyZqykKGKHJYQA=
k8s.io/api v0.20.14/go.mod h1:l/ErofD0cbemY3VGAuqcyQFu0+FoSYD1sOAi6PwUCis=
k8s.io/apimachinery v0.20.14 h1:LG7YY3R3ZRO5UxaIsInDk8adAb9J744CP2EfckAIM7w=
k8s.io/apimachinery v0.20.14/go.mod h1:4KFiDSxCoGviCiRk9kTXIROsIf4VSGkVYjVJjJln3pg=
k8s.io/api v0.20.15 h1:7PoPWNuE/pFFhMIQCuto88+63TIjSlCviXknxWCHLVs=
k8s.io/api v0.20.15/go.mod h1:X3JDf1BiTRQQ6xNAxTuhgi6yL2dHc6fSr9LGzE+Z3YU=
k8s.io/apimachinery v0.20.15 h1:tZW9jhDILQJq0fYXq7/t0xulj+73HzxLVBUGLCNg9uM=
k8s.io/apimachinery v0.20.15/go.mod h1:4KFiDSxCoGviCiRk9kTXIROsIf4VSGkVYjVJjJln3pg=
k8s.io/gengo v0.0.0-20200413195148-3a45101e95ac/go.mod h1:ezvh/TsK7cY6rbqRK0oQQ8IAqLxYwwyPxAX1Pzy0ii0=
k8s.io/klog/v2 v2.0.0/go.mod h1:PBfzABfn139FHAV07az/IF9Wp1bkk3vpT2XSJ76fSDE=
k8s.io/klog/v2 v2.4.0 h1:7+X0fUguPyrKEC4WjH8iGDg3laWgMo5tMnRTIGTTxGQ=

View File

@@ -149,7 +149,10 @@ func (q *Type) Get() (item interface{}, shutdown bool) {
return nil, true
}
item, q.queue = q.queue[0], q.queue[1:]
item = q.queue[0]
// The underlying array still exists and reference this object, so the object will not be garbage collected.
q.queue[0] = nil
q.queue = q.queue[1:]
q.metrics.get(item)

View File

@@ -17,10 +17,13 @@ limitations under the License.
package workqueue_test
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"time"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
)
@@ -159,3 +162,40 @@ func TestReinsert(t *testing.T) {
t.Errorf("Expected queue to be empty. Has %v items", a)
}
}
// TestGarbageCollection ensures that objects that are added then removed from the queue are
// able to be garbage collected.
func TestGarbageCollection(t *testing.T) {
type bigObject struct {
data []byte
}
leakQueue := workqueue.New()
t.Cleanup(func() {
// Make sure leakQueue doesn't go out of scope too early
runtime.KeepAlive(leakQueue)
})
c := &bigObject{data: []byte("hello")}
mustGarbageCollect(t, c)
leakQueue.Add(c)
o, _ := leakQueue.Get()
leakQueue.Done(o)
}
// mustGarbageCollect asserts than an object was garbage collected by the end of the test.
// The input must be a pointer to an object.
func mustGarbageCollect(t *testing.T, i interface{}) {
t.Helper()
var collected int32 = 0
runtime.SetFinalizer(i, func(x interface{}) {
atomic.StoreInt32(&collected, 1)
})
t.Cleanup(func() {
if err := wait.PollImmediate(time.Millisecond*100, wait.ForeverTestTimeout, func() (done bool, err error) {
// Trigger GC explicitly, otherwise we may need to wait a long time for it to run
runtime.GC()
return atomic.LoadInt32(&collected) == 1, nil
}); err != nil {
t.Errorf("object was not garbage collected")
}
})
}