Merge pull request #107163 from cyclinder/fix_leak_goroutine

fix goroutine leaks in TestConfigurationChannels
This commit is contained in:
Kubernetes Prow Robot 2022-01-10 17:23:16 -08:00 committed by GitHub
commit a0dfd958d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 93 additions and 31 deletions

View File

@ -17,6 +17,7 @@ limitations under the License.
package config
import (
"context"
"fmt"
"reflect"
"sync"
@ -81,11 +82,11 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder)
// Channel creates or returns a config source channel. The channel
// only accepts PodUpdates
func (c *PodConfig) Channel(source string) chan<- interface{} {
func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} {
c.sourcesLock.Lock()
defer c.sourcesLock.Unlock()
c.sources.Insert(source)
return c.mux.Channel(source)
return c.mux.ChannelWithContext(ctx, source)
}
// SeenAllSources returns true if seenSources contains all sources in the

View File

@ -17,6 +17,7 @@ limitations under the License.
package config
import (
"context"
"math/rand"
"reflect"
"sort"
@ -85,10 +86,10 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source}
}
func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster()
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}))
channel := config.Channel(TestSource)
channel := config.Channel(ctx, TestSource)
ch := config.Updates()
return channel, ch, config
}
@ -129,7 +130,10 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
}
func TestNewPodAdded(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@ -141,7 +145,10 @@ func TestNewPodAdded(t *testing.T) {
}
func TestNewPodAddedInvalidNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", ""))
@ -153,7 +160,10 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) {
}
func TestNewPodAddedDefaultNamespace(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
@ -165,7 +175,10 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) {
}
func TestNewPodAddedDifferentNamespaces(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default"))
@ -182,7 +195,10 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) {
}
func TestInvalidPodFiltered(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// see an update
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@ -196,7 +212,10 @@ func TestInvalidPodFiltered(t *testing.T) {
}
func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates)
// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@ -214,7 +233,10 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
}
func TestNewPodAddedSnapshot(t *testing.T) {
channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot)
// see an set
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@ -232,7 +254,10 @@ func TestNewPodAddedSnapshot(t *testing.T) {
}
func TestNewPodAddedUpdatedRemoved(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"))
@ -255,7 +280,10 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
}
func TestNewPodAddedDelete(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// should register an add
addedPod := CreateValidPod("foo", "new")
@ -274,7 +302,10 @@ func TestNewPodAddedDelete(t *testing.T) {
}
func TestNewPodAddedUpdatedSet(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// should register an add
podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new"))
@ -296,6 +327,9 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
}
func TestNewPodAddedSetReconciled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Create and touch new test pods, return the new pods and touched pod. We should create new pod list
// before touching to avoid data race.
newTestPods := func(touchStatus, touchSpec bool) ([]*v1.Pod, *v1.Pod) {
@ -318,7 +352,7 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
} {
var podWithStatusChange *v1.Pod
pods, _ := newTestPods(false, false)
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
// Use SET to initialize the config, especially initialize the source set
channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...)
@ -341,6 +375,9 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
}
func TestInitialEmptySet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
for _, test := range []struct {
mode PodConfigNotificationMode
op kubetypes.PodOperation
@ -349,7 +386,7 @@ func TestInitialEmptySet(t *testing.T) {
{PodConfigNotificationSnapshot, kubetypes.SET},
{PodConfigNotificationSnapshotAndUpdates, kubetypes.SET},
} {
channel, ch, _ := createPodConfigTester(test.mode)
channel, ch, _ := createPodConfigTester(ctx, test.mode)
// should register an empty PodUpdate operation
podUpdate := CreatePodUpdate(kubetypes.SET, TestSource)
@ -366,7 +403,10 @@ func TestInitialEmptySet(t *testing.T) {
}
func TestPodUpdateAnnotations(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
pod := CreateValidPod("foo2", "new")
pod.Annotations = make(map[string]string)
@ -395,7 +435,10 @@ func TestPodUpdateAnnotations(t *testing.T) {
}
func TestPodUpdateLabels(t *testing.T) {
channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
pod := CreateValidPod("foo2", "new")
pod.Labels = make(map[string]string)

View File

@ -270,21 +270,24 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku
// source of all configuration
cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder)
// TODO: it needs to be replaced by a proper context in the future
ctx := context.TODO()
// define file config source
if kubeCfg.StaticPodPath != "" {
klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath)
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource))
config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource))
}
// define url config source
if kubeCfg.StaticPodURL != "" {
klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader)
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource))
config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource))
}
if kubeDeps.KubeClient != nil {
klog.InfoS("Adding apiserver pod source")
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource))
config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource))
}
return cfg, nil
}

View File

@ -17,6 +17,7 @@ limitations under the License.
package config
import (
"context"
"sync"
"k8s.io/apimachinery/pkg/util/wait"
@ -57,12 +58,12 @@ func NewMux(merger Merger) *Mux {
return mux
}
// Channel returns a channel where a configuration source
// ChannelWithContext returns a channel where a configuration source
// can send updates of new configurations. Multiple calls with the same
// source will return the same channel. This allows change and state based sources
// to use the same channel. Different source names however will be treated as a
// union.
func (m *Mux) Channel(source string) chan interface{} {
func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} {
if len(source) == 0 {
panic("Channel given an empty name")
}
@ -74,7 +75,8 @@ func (m *Mux) Channel(source string) chan interface{} {
}
newChannel := make(chan interface{})
m.sources[source] = newChannel
go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop)
go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done())
return newChannel
}

View File

@ -17,17 +17,21 @@ limitations under the License.
package config
import (
"context"
"reflect"
"testing"
)
func TestConfigurationChannels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mux := NewMux(nil)
channelOne := mux.Channel("one")
if channelOne != mux.Channel("one") {
channelOne := mux.ChannelWithContext(ctx, "one")
if channelOne != mux.ChannelWithContext(ctx, "one") {
t.Error("Didn't get the same muxuration channel back with the same name")
}
channelTwo := mux.Channel("two")
channelTwo := mux.ChannelWithContext(ctx, "two")
if channelOne == channelTwo {
t.Error("Got back the same muxuration channel for different names")
}
@ -50,12 +54,18 @@ func (m MergeMock) Merge(source string, update interface{}) error {
}
func TestMergeInvoked(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
merger := MergeMock{"one", "test", t}
mux := NewMux(&merger)
mux.Channel("one") <- "test"
mux.ChannelWithContext(ctx, "one") <- "test"
}
func TestMergeFuncInvoked(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan bool)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
if source != "one" {
@ -67,11 +77,14 @@ func TestMergeFuncInvoked(t *testing.T) {
ch <- true
return nil
}))
mux.Channel("one") <- "test"
mux.ChannelWithContext(ctx, "one") <- "test"
<-ch
}
func TestSimultaneousMerge(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
ch := make(chan bool, 2)
mux := NewMux(MergeFunc(func(source string, update interface{}) error {
switch source {
@ -89,8 +102,8 @@ func TestSimultaneousMerge(t *testing.T) {
ch <- true
return nil
}))
source := mux.Channel("one")
source2 := mux.Channel("two")
source := mux.ChannelWithContext(ctx, "one")
source2 := mux.ChannelWithContext(ctx, "two")
source <- "test"
source2 <- "test2"
<-ch