client-go/tools/cache: goroutine leak checking

Several tests leaked goroutines. All of those get fixed where possible
without API changes. Goleak is used to prevent regressions.

One new test specifically covers shutdown of an informer and its event
handlers.

Kubernetes-commit: 0ba43734b4c8998b4aaeb1fa2bec8dee609fa50a
This commit is contained in:
Patrick Ohly 2024-11-28 17:59:36 +01:00 committed by Kubernetes Publisher
parent 67da6d1a41
commit b836a27b07
4 changed files with 125 additions and 27 deletions

View File

@ -39,6 +39,7 @@ import (
func Example() {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
defer source.Shutdown()
// This will hold the downstream state, as we know it.
downstream := NewStore(DeletionHandlingMetaNamespaceKeyFunc)
@ -128,6 +129,7 @@ func Example() {
func ExampleNewInformer() {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
defer source.Shutdown()
// Let's do threadsafe output to get predictable test results.
deletionCounter := make(chan string, 1000)
@ -189,7 +191,7 @@ func TestHammerController(t *testing.T) {
// race detector.
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
// Let's do threadsafe output to get predictable test results.
outputSetLock := sync.Mutex{}
@ -300,7 +302,7 @@ func TestUpdate(t *testing.T) {
// call to update.
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
const (
FROM = "from"
@ -410,7 +412,7 @@ func TestUpdate(t *testing.T) {
func TestPanicPropagated(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
// Make a controller that just panic if the AddFunc is called.
_, controller := NewInformer(
@ -456,7 +458,7 @@ func TestPanicPropagated(t *testing.T) {
func TestTransformingInformer(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
makePod := func(name, generation string) *v1.Pod {
return &v1.Pod{
@ -578,7 +580,7 @@ func TestTransformingInformer(t *testing.T) {
func TestTransformingInformerRace(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
label := "to-be-transformed"
makePod := func(name string) *v1.Pod {

View File

@ -17,10 +17,27 @@ limitations under the License.
package cache
import (
"os"
"testing"
"go.uber.org/goleak"
)
func TestMain(m *testing.M) {
os.Exit(m.Run())
options := []goleak.Option{
// These tests run goroutines which get stuck in Pop.
// This cannot be fixed without modifying the API.
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addReplace.func1"),
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestFIFO_addUpdate.func1"),
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addReplace.func1"),
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestDeltaFIFO_addUpdate.func1"),
// TODO: fix the following tests by adding WithContext APIs and cancellation.
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.TestTransformingInformerRace.func3"),
// Created by k8s.io/client-go/tools/cache.TestReflectorListAndWatch, cannot filter on that (https://github.com/uber-go/goleak/issues/135):
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).ListAndWatch"),
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*Reflector).startResync"),
// ???
goleak.IgnoreAnyFunction("k8s.io/client-go/tools/cache.(*DeltaFIFO).Close"),
}
goleak.VerifyTestMain(m, options...)
}

View File

@ -29,12 +29,13 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
fcache "k8s.io/client-go/tools/cache/testing"
testingclock "k8s.io/utils/clock/testing"
)
@ -123,7 +124,7 @@ func isRegistered(i SharedInformer, h ResourceEventHandlerRegistration) bool {
func TestIndexer(t *testing.T) {
assert := assert.New(t)
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
pod1 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", Labels: map[string]string{"a": "a-val", "b": "b-val1"}}}
pod2 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", Labels: map[string]string{"b": "b-val2"}}}
pod3 := &v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod3", Labels: map[string]string{"a": "a-val2"}}}
@ -197,7 +198,7 @@ func TestIndexer(t *testing.T) {
func TestListenerResyncPeriods(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2"}})
@ -284,7 +285,7 @@ func TestListenerResyncPeriods(t *testing.T) {
func TestResyncCheckPeriod(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 12*time.Hour).(*sharedIndexInformer)
@ -356,7 +357,7 @@ func TestResyncCheckPeriod(t *testing.T) {
// verify that https://github.com/kubernetes/kubernetes/issues/59822 is fixed
func TestSharedInformerInitializationRace(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
listener := newTestListener("raceListener", 0)
@ -371,7 +372,7 @@ func TestSharedInformerInitializationRace(t *testing.T) {
// resync and no resync see the expected state.
func TestSharedInformerWatchDisruption(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
@ -446,7 +447,7 @@ func TestSharedInformerWatchDisruption(t *testing.T) {
}
func TestSharedInformerErrorHandling(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
source.ListError = fmt.Errorf("Access Denied")
@ -474,7 +475,7 @@ func TestSharedInformerErrorHandling(t *testing.T) {
// TestSharedInformerStartRace is a regression test to ensure there is no race between
// Run and SetWatchErrorHandler, and Run and SetTransform.
func TestSharedInformerStartRace(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
stop := make(chan struct{})
go func() {
@ -500,7 +501,7 @@ func TestSharedInformerStartRace(t *testing.T) {
func TestSharedInformerTransformer(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1", UID: "pod1", ResourceVersion: "1"}})
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod2", UID: "pod2", ResourceVersion: "2"}})
@ -531,7 +532,7 @@ func TestSharedInformerTransformer(t *testing.T) {
}
func TestSharedInformerRemoveHandler(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
@ -569,12 +570,12 @@ func TestSharedInformerRemoveHandler(t *testing.T) {
}
func TestSharedInformerRemoveForeignHandler(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
source2 := fcache.NewFakeControllerSource()
source2 := newFakeControllerSource(t)
source2.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer2 := NewSharedInformer(source2, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@ -651,7 +652,7 @@ func TestSharedInformerRemoveForeignHandler(t *testing.T) {
}
func TestSharedInformerMultipleRegistration(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@ -719,7 +720,7 @@ func TestSharedInformerMultipleRegistration(t *testing.T) {
}
func TestRemovingRemovedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@ -751,7 +752,7 @@ func TestRemovingRemovedSharedInformer(t *testing.T) {
// listeners without tripping it up. There are not really many assertions in this
// test. Meant to be run with -race to find race conditions
func TestSharedInformerHandlerAbuse(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
ctx, cancel := context.WithCancel(context.Background())
@ -865,7 +866,7 @@ func TestSharedInformerHandlerAbuse(t *testing.T) {
}
func TestStateSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@ -914,7 +915,7 @@ func TestStateSharedInformer(t *testing.T) {
}
func TestAddOnStoppedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@ -947,7 +948,7 @@ func TestAddOnStoppedSharedInformer(t *testing.T) {
}
func TestRemoveOnStoppedSharedInformer(t *testing.T) {
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
source.Add(&v1.Pod{ObjectMeta: metav1.ObjectMeta{Name: "pod1"}})
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second).(*sharedIndexInformer)
@ -976,7 +977,7 @@ func TestRemoveOnStoppedSharedInformer(t *testing.T) {
func TestRemoveWhileActive(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
@ -1012,7 +1013,7 @@ func TestRemoveWhileActive(t *testing.T) {
func TestAddWhileActive(t *testing.T) {
// source simulates an apiserver object endpoint.
source := fcache.NewFakeControllerSource()
source := newFakeControllerSource(t)
// create the shared informer and resync every 12 hours
informer := NewSharedInformer(source, &v1.Pod{}, 0).(*sharedIndexInformer)
@ -1072,3 +1073,52 @@ func TestAddWhileActive(t *testing.T) {
return
}
}
// TestShutdown depends on goleak.VerifyTestMain in main_test.go to verify that
// all goroutines really have stopped in the different scenarios.
func TestShutdown(t *testing.T) {
t.Run("no-context", func(t *testing.T) {
source := newFakeControllerSource(t)
stop := make(chan struct{})
defer close(stop)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
AddFunc: func(_ any) {},
})
require.NoError(t, err)
defer func() {
assert.NoError(t, informer.RemoveEventHandler(handler))
}()
go informer.Run(stop)
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
})
t.Run("no-context-later", func(t *testing.T) {
source := newFakeControllerSource(t)
stop := make(chan struct{})
defer close(stop)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
go informer.Run(stop)
require.Eventually(t, informer.HasSynced, time.Minute, time.Millisecond, "informer has synced")
handler, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
AddFunc: func(_ any) {},
})
require.NoError(t, err)
assert.NoError(t, informer.RemoveEventHandler(handler))
})
t.Run("no-run", func(t *testing.T) {
source := newFakeControllerSource(t)
informer := NewSharedInformer(source, &v1.Pod{}, 1*time.Second)
_, err := informer.AddEventHandler(ResourceEventHandlerFuncs{
AddFunc: func(_ any) {},
})
require.NoError(t, err)
// At this point, neither informer nor handler have any goroutines running
// and it doesn't matter that nothing gets stopped or removed.
})
}

29
tools/cache/util_test.go vendored Normal file
View File

@ -0,0 +1,29 @@
/*
Copyright 2024 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cache
import (
"testing"
fcache "k8s.io/client-go/tools/cache/testing"
)
func newFakeControllerSource(tb testing.TB) *fcache.FakeControllerSource {
source := fcache.NewFakeControllerSource()
tb.Cleanup(source.Shutdown)
return source
}