From b836a27b07b1f662c36e5841f24d21d90cb621d1 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Thu, 28 Nov 2024 17:59:36 +0100 Subject: [PATCH] client-go/tools/cache: goroutine leak checking Several tests leaked goroutines. All of those get fixed where possible without API changes. Goleak is used to prevent regressions. One new test specifically covers shutdown of an informer and its event handlers. Kubernetes-commit: 0ba43734b4c8998b4aaeb1fa2bec8dee609fa50a --- tools/cache/controller_test.go | 12 ++-- tools/cache/main_test.go | 21 ++++++- tools/cache/shared_informer_test.go | 90 ++++++++++++++++++++++------- tools/cache/util_test.go | 29 ++++++++++ 4 files changed, 125 insertions(+), 27 deletions(-) create mode 100644 tools/cache/util_test.go diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 992c1f5a..232585c8 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -39,6 +39,7 @@ import ( func Example() { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() + defer source.Shutdown() // This will hold the downstream state, as we know it. downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc) @@ -128,6 +129,7 @@ func Example() { func ExampleNewInformer() { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() + defer source.Shutdown() // Let's do threadsafe output to get predictable test results. deletionCounter := make(chan string, 1000) @@ -189,7 +191,7 @@ func TestHammerController(t *testing.T) { // race detector. // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // Let's do threadsafe output to get predictable test results. outputSetLock := sync.Mutex{} @@ -300,7 +302,7 @@ func TestUpdate(t *testing.T) { // call to update. // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) const ( FROM = "from" @@ -410,7 +412,7 @@ func TestUpdate(t *testing.T) { func TestPanicPropagated(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // Make a controller that just panic if the AddFunc is called. _, controller := NewInformer( @@ -456,7 +458,7 @@ func TestPanicPropagated(t *testing.T) { func TestTransformingInformer(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) makePod := func(name, generation string) *v1.Pod { return &v1.Pod{ @@ -578,7 +580,7 @@ func TestTransformingInformer(t *testing.T) { func TestTransformingInformerRace(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) label := "to-be-transformed" makePod := func(name string) *v1.Pod { diff --git a/tools/cache/main_test.go b/tools/cache/main_test.go index abbfb0b5..12817a07 100644 --- a/tools/cache/main_test.go +++ b/tools/cache/main_test.go @@ -17,10 +17,27 @@ limitations under the License. package cache import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - os.Exit(m.Run()) + options := []goleak.Option{ + // These tests run goroutines which get stuck in Pop. + // This cannot be fixed without modifying the API. + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addReplace.func1"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"), + + // TODO: fix the following tests by adding WithContext APIs and cancellation. + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"), + // Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135): + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"), + // ??? + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"), + } + goleak.VerifyTestMain(m, options...) } diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 1e4ed9d5..a8f5f074 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -29,12 +29,13 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - fcache "k8s.io/client-go/tools/cache/testing" testingclock "k8s.io/utils/clock/testing" ) @@ -123,7 +124,7 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { func TestIndexer(t *testing.T) { assert := assert.New(t) // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}} pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}} pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}} @@ -197,7 +198,7 @@ func TestIndexer(t *testing.T) { func TestListenerResyncPeriods(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) @@ -284,7 +285,7 @@ func TestListenerResyncPeriods(t *testing.T) { func TestResyncCheckPeriod(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer) @@ -356,7 +357,7 @@ func TestResyncCheckPeriod(t *testing.T) { // verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed func TestSharedInformerInitializationRace(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) listener := newTestListener("raceListener", 0) @@ -371,7 +372,7 @@ func TestSharedInformerInitializationRace(t *testing.T) { // resync and no resync see the expected state. func TestSharedInformerWatchDisruption(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}}) @@ -446,7 +447,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) { } func TestSharedInformerErrorHandling(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) source.ListError = fmt.Errorf("Access Denied") @@ -474,7 +475,7 @@ func TestSharedInformerErrorHandling(t *testing.T) { // TestSharedInformerStartRace is a regression test to ensure there is no race between // Run and SetWatchErrorHandler, and Run and SetTransform. func TestSharedInformerStartRace(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) stop := make(chan struct{}) go func() { @@ -500,7 +501,7 @@ func TestSharedInformerStartRace(t *testing.T) { func TestSharedInformerTransformer(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}}) @@ -531,7 +532,7 @@ func TestSharedInformerTransformer(t *testing.T) { } func TestSharedInformerRemoveHandler(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) @@ -569,12 +570,12 @@ func TestSharedInformerRemoveHandler(t *testing.T) { } func TestSharedInformerRemoveForeignHandler(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) - source2 := fcache.NewFakeControllerSource() + source2 := newFakeControllerSource(t) source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -651,7 +652,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { } func TestSharedInformerMultipleRegistration(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -719,7 +720,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { } func TestRemovingRemovedSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -751,7 +752,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { // listeners without tripping it up. There are not really many assertions in this // test. Meant to be run with -race to find race conditions func TestSharedInformerHandlerAbuse(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) ctx, cancel := context.WithCancel(context.Background()) @@ -865,7 +866,7 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { } func TestStateSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -914,7 +915,7 @@ func TestStateSharedInformer(t *testing.T) { } func TestAddOnStoppedSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -947,7 +948,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { } func TestRemoveOnStoppedSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -976,7 +977,7 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) { func TestRemoveWhileActive(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) @@ -1012,7 +1013,7 @@ func TestRemoveWhileActive(t *testing.T) { func TestAddWhileActive(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) @@ -1072,3 +1073,52 @@ func TestAddWhileActive(t *testing.T) { return } } + +// TestShutdown depends on goleak.VerifyTestMain in main_test.go to verify that +// all goroutines really have stopped in the different scenarios. +func TestShutdown(t *testing.T) { + t.Run("no-context", func(t *testing.T) { + source := newFakeControllerSource(t) + stop := make(chan struct{}) + defer close(stop) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(_ any) {}, + }) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handler)) + }() + go informer.Run(stop) + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + }) + + t.Run("no-context-later", func(t *testing.T) { + source := newFakeControllerSource(t) + stop := make(chan struct{}) + defer close(stop) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + go informer.Run(stop) + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + + handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(_ any) {}, + }) + require.NoError(t, err) + assert.NoError(t, informer.RemoveEventHandler(handler)) + }) + + t.Run("no-run", func(t *testing.T) { + source := newFakeControllerSource(t) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + _, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(_ any) {}, + }) + require.NoError(t, err) + + // At this point, neither informer nor handler have any goroutines running + // and it doesn't matter that nothing gets stopped or removed. + }) +} diff --git a/tools/cache/util_test.go b/tools/cache/util_test.go new file mode 100644 index 00000000..6d073ca7 --- /dev/null +++ b/tools/cache/util_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + + fcache "k8s.io/client-go/tools/cache/testing" +) + +func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource { + source := fcache.NewFakeControllerSource() + tb.Cleanup(source.Shutdown) + return source +}