diff --git a/tools/cache/controller_test.go b/tools/cache/controller_test.go index 992c1f5a..232585c8 100644 --- a/tools/cache/controller_test.go +++ b/tools/cache/controller_test.go @@ -39,6 +39,7 @@ import ( func Example() { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() + defer source.Shutdown() // This will hold the downstream state, as we know it. downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc) @@ -128,6 +129,7 @@ func Example() { func ExampleNewInformer() { // source simulates an apiserver object endpoint. source := fcache.NewFakeControllerSource() + defer source.Shutdown() // Let's do threadsafe output to get predictable test results. deletionCounter := make(chan string, 1000) @@ -189,7 +191,7 @@ func TestHammerController(t *testing.T) { // race detector. // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // Let's do threadsafe output to get predictable test results. outputSetLock := sync.Mutex{} @@ -300,7 +302,7 @@ func TestUpdate(t *testing.T) { // call to update. // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) const ( FROM = "from" @@ -410,7 +412,7 @@ func TestUpdate(t *testing.T) { func TestPanicPropagated(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // Make a controller that just panic if the AddFunc is called. _, controller := NewInformer( @@ -456,7 +458,7 @@ func TestPanicPropagated(t *testing.T) { func TestTransformingInformer(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) makePod := func(name, generation string) *v1.Pod { return &v1.Pod{ @@ -578,7 +580,7 @@ func TestTransformingInformer(t *testing.T) { func TestTransformingInformerRace(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) label := "to-be-transformed" makePod := func(name string) *v1.Pod { diff --git a/tools/cache/main_test.go b/tools/cache/main_test.go index abbfb0b5..12817a07 100644 --- a/tools/cache/main_test.go +++ b/tools/cache/main_test.go @@ -17,10 +17,27 @@ limitations under the License. package cache import ( - "os" "testing" + + "go.uber.org/goleak" ) func TestMain(m *testing.M) { - os.Exit(m.Run()) + options := []goleak.Option{ + // These tests run goroutines which get stuck in Pop. + // This cannot be fixed without modifying the API. + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addReplace.func1"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"), + + // TODO: fix the following tests by adding WithContext APIs and cancellation. + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"), + // Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135): + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"), + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"), + // ??? + goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"), + } + goleak.VerifyTestMain(m, options...) } diff --git a/tools/cache/shared_informer_test.go b/tools/cache/shared_informer_test.go index 1e4ed9d5..a8f5f074 100644 --- a/tools/cache/shared_informer_test.go +++ b/tools/cache/shared_informer_test.go @@ -29,12 +29,13 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/wait" - fcache "k8s.io/client-go/tools/cache/testing" testingclock "k8s.io/utils/clock/testing" ) @@ -123,7 +124,7 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool { func TestIndexer(t *testing.T) { assert := assert.New(t) // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}} pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}} pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}} @@ -197,7 +198,7 @@ func TestIndexer(t *testing.T) { func TestListenerResyncPeriods(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}}) @@ -284,7 +285,7 @@ func TestListenerResyncPeriods(t *testing.T) { func TestResyncCheckPeriod(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer) @@ -356,7 +357,7 @@ func TestResyncCheckPeriod(t *testing.T) { // verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed func TestSharedInformerInitializationRace(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) listener := newTestListener("raceListener", 0) @@ -371,7 +372,7 @@ func TestSharedInformerInitializationRace(t *testing.T) { // resync and no resync see the expected state. func TestSharedInformerWatchDisruption(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}}) @@ -446,7 +447,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) { } func TestSharedInformerErrorHandling(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) source.ListError = fmt.Errorf("Access Denied") @@ -474,7 +475,7 @@ func TestSharedInformerErrorHandling(t *testing.T) { // TestSharedInformerStartRace is a regression test to ensure there is no race between // Run and SetWatchErrorHandler, and Run and SetTransform. func TestSharedInformerStartRace(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) stop := make(chan struct{}) go func() { @@ -500,7 +501,7 @@ func TestSharedInformerStartRace(t *testing.T) { func TestSharedInformerTransformer(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}}) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}}) @@ -531,7 +532,7 @@ func TestSharedInformerTransformer(t *testing.T) { } func TestSharedInformerRemoveHandler(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) @@ -569,12 +570,12 @@ func TestSharedInformerRemoveHandler(t *testing.T) { } func TestSharedInformerRemoveForeignHandler(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) - source2 := fcache.NewFakeControllerSource() + source2 := newFakeControllerSource(t) source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -651,7 +652,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) { } func TestSharedInformerMultipleRegistration(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -719,7 +720,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) { } func TestRemovingRemovedSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -751,7 +752,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) { // listeners without tripping it up. There are not really many assertions in this // test. Meant to be run with -race to find race conditions func TestSharedInformerHandlerAbuse(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) ctx, cancel := context.WithCancel(context.Background()) @@ -865,7 +866,7 @@ func TestSharedInformerHandlerAbuse(t *testing.T) { } func TestStateSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -914,7 +915,7 @@ func TestStateSharedInformer(t *testing.T) { } func TestAddOnStoppedSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -947,7 +948,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) { } func TestRemoveOnStoppedSharedInformer(t *testing.T) { - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}}) informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer) @@ -976,7 +977,7 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) { func TestRemoveWhileActive(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) @@ -1012,7 +1013,7 @@ func TestRemoveWhileActive(t *testing.T) { func TestAddWhileActive(t *testing.T) { // source simulates an apiserver object endpoint. - source := fcache.NewFakeControllerSource() + source := newFakeControllerSource(t) // create the shared informer and resync every 12 hours informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer) @@ -1072,3 +1073,52 @@ func TestAddWhileActive(t *testing.T) { return } } + +// TestShutdown depends on goleak.VerifyTestMain in main_test.go to verify that +// all goroutines really have stopped in the different scenarios. +func TestShutdown(t *testing.T) { + t.Run("no-context", func(t *testing.T) { + source := newFakeControllerSource(t) + stop := make(chan struct{}) + defer close(stop) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(_ any) {}, + }) + require.NoError(t, err) + defer func() { + assert.NoError(t, informer.RemoveEventHandler(handler)) + }() + go informer.Run(stop) + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + }) + + t.Run("no-context-later", func(t *testing.T) { + source := newFakeControllerSource(t) + stop := make(chan struct{}) + defer close(stop) + + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + go informer.Run(stop) + require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced") + + handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(_ any) {}, + }) + require.NoError(t, err) + assert.NoError(t, informer.RemoveEventHandler(handler)) + }) + + t.Run("no-run", func(t *testing.T) { + source := newFakeControllerSource(t) + informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second) + _, err := informer.AddEventHandler(ResourceEventHandlerFuncs{ + AddFunc: func(_ any) {}, + }) + require.NoError(t, err) + + // At this point, neither informer nor handler have any goroutines running + // and it doesn't matter that nothing gets stopped or removed. + }) +} diff --git a/tools/cache/util_test.go b/tools/cache/util_test.go new file mode 100644 index 00000000..6d073ca7 --- /dev/null +++ b/tools/cache/util_test.go @@ -0,0 +1,29 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package cache + +import ( + "testing" + + fcache "k8s.io/client-go/tools/cache/testing" +) + +func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource { + source := fcache.NewFakeControllerSource() + tb.Cleanup(source.Shutdown) + return source +}