fix goroutine leaks in TestConfigurationChannels

Signed-off-by: cyclinder <qifeng.guo@daocloud.io>
This commit is contained in:
cyclinder
2021-12-21 20:16:51 +08:00
parent 13e97453f9
commit 928e686877
5 changed files with 93 additions and 31 deletions

View File

@@ -17,6 +17,7 @@ limitations under the License.
package config
import (
"context"
"math/rand"
"reflect"
"sort"
@@ -85,10 +86,10 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
}
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
channel := config.Channel(TestSource)
channel := config.Channel(ctx, TestSource)
ch := config.Updates()
return channel, ch, config
}
@@ -129,7 +130,10 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
}
func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -141,7 +145,10 @@ func TestNewPodAdded(t *testing.T) {
}
func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))
@@ -153,7 +160,10 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) {
}
func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
@@ -165,7 +175,10 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) {
}
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
@@ -182,7 +195,10 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) {
}
func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -196,7 +212,10 @@ func TestInvalidPodFiltered(t *testing.T) {
}
func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates)
// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -214,7 +233,10 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
}
func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot)
// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -232,7 +254,10 @@ func TestNewPodAddedSnapshot(t *testing.T) {
}
func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@@ -255,7 +280,10 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
}
func TestNewPodAddedDelete(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// should register an add
addedPod := CreateValidPod("foo", "new")
@@ -274,7 +302,10 @@ func TestNewPodAddedDelete(t *testing.T) {
}
func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
@@ -296,6 +327,9 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
}
func TestNewPodAddedSetReconciled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
// before touching to avoid data race.
newTestPods := func(touchStatus, touchSpec bool) ([]*v1.Pod, *v1.Pod) {
@@ -318,7 +352,7 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
} {
var podWithStatusChange *v1.Pod
pods, _ := newTestPods(false, false)
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// Use SET to initialize the config, especially initialize the source set
channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
@@ -341,6 +375,9 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
}
func TestInitialEmptySet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, test := range []struct {
mode PodConfigNotificationMode
op kubetypes.PodOperation
@@ -349,7 +386,7 @@ func TestInitialEmptySet(t *testing.T) {
{PodConfigNotificationSnapshot, kubetypes.SET},
{PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
} {
channel, ch, _ := createPodConfigTester(test.mode)
channel, ch, _ := createPodConfigTester(ctx, test.mode)
// should register an empty PodUpdate operation
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
@@ -366,7 +403,10 @@ func TestInitialEmptySet(t *testing.T) {
}
func TestPodUpdateAnnotations(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
pod := CreateValidPod("foo2", "new")
pod.Annotations = make(map[string]string)
@@ -395,7 +435,10 @@ func TestPodUpdateAnnotations(t *testing.T) {
}
func TestPodUpdateLabels(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
pod := CreateValidPod("foo2", "new")
pod.Labels = make(map[string]string)