mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-08 11:38:15 +00:00
client-go/tools/cache: goroutine leak checking
Several tests leaked goroutines. All of those get fixed where possible without API changes. Goleak is used to prevent regressions. One new test specifically covers shutdown of an informer and its event handlers.
This commit is contained in:
parent
e3c584030c
commit
0ba43734b4
@ -39,6 +39,7 @@ import (
|
||||
func Example() {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
defer source.Shutdown()
|
||||
|
||||
// This will hold the downstream state, as we know it.
|
||||
downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
|
||||
@ -128,6 +129,7 @@ func Example() {
|
||||
func ExampleNewInformer() {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
defer source.Shutdown()
|
||||
|
||||
// Let's do threadsafe output to get predictable test results.
|
||||
deletionCounter := make(chan string, 1000)
|
||||
@ -189,7 +191,7 @@ func TestHammerController(t *testing.T) {
|
||||
// race detector.
|
||||
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
// Let's do threadsafe output to get predictable test results.
|
||||
outputSetLock := sync.Mutex{}
|
||||
@ -300,7 +302,7 @@ func TestUpdate(t *testing.T) {
|
||||
// call to update.
|
||||
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
const (
|
||||
FROM = "from"
|
||||
@ -410,7 +412,7 @@ func TestUpdate(t *testing.T) {
|
||||
|
||||
func TestPanicPropagated(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
// Make a controller that just panic if the AddFunc is called.
|
||||
_, controller := NewInformer(
|
||||
@ -456,7 +458,7 @@ func TestPanicPropagated(t *testing.T) {
|
||||
|
||||
func TestTransformingInformer(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
makePod := func(name, generation string) *v1.Pod {
|
||||
return &v1.Pod{
|
||||
@ -578,7 +580,7 @@ func TestTransformingInformer(t *testing.T) {
|
||||
|
||||
func TestTransformingInformerRace(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
label := "to-be-transformed"
|
||||
makePod := func(name string) *v1.Pod {
|
||||
|
@ -17,10 +17,27 @@ limitations under the License.
|
||||
package cache
|
||||
|
||||
import (
|
||||
"os"
|
||||
"testing"
|
||||
|
||||
"go.uber.org/goleak"
|
||||
)
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
os.Exit(m.Run())
|
||||
options := []goleak.Option{
|
||||
// These tests run goroutines which get stuck in Pop.
|
||||
// This cannot be fixed without modifying the API.
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addReplace.func1"),
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"),
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"),
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"),
|
||||
|
||||
// TODO: fix the following tests by adding WithContext APIs and cancellation.
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"),
|
||||
// Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135):
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"),
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"),
|
||||
// ???
|
||||
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"),
|
||||
}
|
||||
goleak.VerifyTestMain(m, options...)
|
||||
}
|
||||
|
@ -29,12 +29,13 @@ import (
|
||||
"github.com/google/go-cmp/cmp"
|
||||
"github.com/google/go-cmp/cmp/cmpopts"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
v1 "k8s.io/api/core/v1"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
fcache "k8s.io/client-go/tools/cache/testing"
|
||||
testingclock "k8s.io/utils/clock/testing"
|
||||
)
|
||||
|
||||
@ -123,7 +124,7 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
|
||||
func TestIndexer(t *testing.T) {
|
||||
assert := assert.New(t)
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
|
||||
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
|
||||
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
|
||||
@ -197,7 +198,7 @@ func TestIndexer(t *testing.T) {
|
||||
|
||||
func TestListenerResyncPeriods(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
|
||||
|
||||
@ -284,7 +285,7 @@ func TestListenerResyncPeriods(t *testing.T) {
|
||||
|
||||
func TestResyncCheckPeriod(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
// create the shared informer and resync every 12 hours
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
|
||||
@ -356,7 +357,7 @@ func TestResyncCheckPeriod(t *testing.T) {
|
||||
|
||||
// verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed
|
||||
func TestSharedInformerInitializationRace(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
listener := newTestListener("raceListener", 0)
|
||||
|
||||
@ -371,7 +372,7 @@ func TestSharedInformerInitializationRace(t *testing.T) {
|
||||
// resync and no resync see the expected state.
|
||||
func TestSharedInformerWatchDisruption(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
|
||||
@ -446,7 +447,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSharedInformerErrorHandling(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
source.ListError = fmt.Errorf("Access Denied")
|
||||
|
||||
@ -474,7 +475,7 @@ func TestSharedInformerErrorHandling(t *testing.T) {
|
||||
// TestSharedInformerStartRace is a regression test to ensure there is no race between
|
||||
// Run and SetWatchErrorHandler, and Run and SetTransform.
|
||||
func TestSharedInformerStartRace(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
stop := make(chan struct{})
|
||||
go func() {
|
||||
@ -500,7 +501,7 @@ func TestSharedInformerStartRace(t *testing.T) {
|
||||
|
||||
func TestSharedInformerTransformer(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
|
||||
@ -531,7 +532,7 @@ func TestSharedInformerTransformer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSharedInformerRemoveHandler(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
@ -569,12 +570,12 @@ func TestSharedInformerRemoveHandler(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSharedInformerRemoveForeignHandler(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
|
||||
source2 := fcache.NewFakeControllerSource()
|
||||
source2 := newFakeControllerSource(t)
|
||||
source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
@ -651,7 +652,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestSharedInformerMultipleRegistration(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
@ -719,7 +720,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemovingRemovedSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
@ -751,7 +752,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) {
|
||||
// listeners without tripping it up. There are not really many assertions in this
|
||||
// test. Meant to be run with -race to find race conditions
|
||||
func TestSharedInformerHandlerAbuse(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -865,7 +866,7 @@ func TestSharedInformerHandlerAbuse(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestStateSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
@ -914,7 +915,7 @@ func TestStateSharedInformer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestAddOnStoppedSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
@ -947,7 +948,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestRemoveOnStoppedSharedInformer(t *testing.T) {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
|
||||
@ -976,7 +977,7 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) {
|
||||
|
||||
func TestRemoveWhileActive(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
// create the shared informer and resync every 12 hours
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
@ -1012,7 +1013,7 @@ func TestRemoveWhileActive(t *testing.T) {
|
||||
|
||||
func TestAddWhileActive(t *testing.T) {
|
||||
// source simulates an apiserver object endpoint.
|
||||
source := fcache.NewFakeControllerSource()
|
||||
source := newFakeControllerSource(t)
|
||||
|
||||
// create the shared informer and resync every 12 hours
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
|
||||
@ -1072,3 +1073,52 @@ func TestAddWhileActive(t *testing.T) {
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// TestShutdown depends on goleak.VerifyTestMain in main_test.go to verify that
|
||||
// all goroutines really have stopped in the different scenarios.
|
||||
func TestShutdown(t *testing.T) {
|
||||
t.Run("no-context", func(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
|
||||
AddFunc: func(_ any) {},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
defer func() {
|
||||
assert.NoError(t, informer.RemoveEventHandler(handler))
|
||||
}()
|
||||
go informer.Run(stop)
|
||||
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
|
||||
})
|
||||
|
||||
t.Run("no-context-later", func(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
stop := make(chan struct{})
|
||||
defer close(stop)
|
||||
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
go informer.Run(stop)
|
||||
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
|
||||
|
||||
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
|
||||
AddFunc: func(_ any) {},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
assert.NoError(t, informer.RemoveEventHandler(handler))
|
||||
})
|
||||
|
||||
t.Run("no-run", func(t *testing.T) {
|
||||
source := newFakeControllerSource(t)
|
||||
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
|
||||
_, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
|
||||
AddFunc: func(_ any) {},
|
||||
})
|
||||
require.NoError(t, err)
|
||||
|
||||
// At this point, neither informer nor handler have any goroutines running
|
||||
// and it doesn't matter that nothing gets stopped or removed.
|
||||
})
|
||||
}
|
||||
|
29
staging/src/k8s.io/client-go/tools/cache/util_test.go
vendored
Normal file
29
staging/src/k8s.io/client-go/tools/cache/util_test.go
vendored
Normal file
@ -0,0 +1,29 @@
|
||||
/*
|
||||
Copyright 2024 The Kubernetes Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package cache
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
fcache "k8s.io/client-go/tools/cache/testing"
|
||||
)
|
||||
|
||||
func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource {
|
||||
source := fcache.NewFakeControllerSource()
|
||||
tb.Cleanup(source.Shutdown)
|
||||
return source
|
||||
}
|
@ -83,6 +83,8 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
|
||||
|
1
staging/src/k8s.io/component-helpers/go.sum
generated
1
staging/src/k8s.io/component-helpers/go.sum
generated
@ -94,6 +94,7 @@ github.com/x448/float16 v0.8.4 h1:qLwI1I70+NjRFUR3zs1JPUCgaCXSh3SW62uAKT1mSBM=
|
||||
github.com/x448/float16 v0.8.4/go.mod h1:14CWIYCyZA/cWjXOioeEpHeN/83MdbZDRQHoFcYsOfg=
|
||||
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
|
||||
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
|
||||
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
|
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
|
||||
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
|
||||
|
Loading…
Reference in New Issue
Block a user