diff --git a/pkg/kubelet/config/config.go b/pkg/kubelet/config/config.go index f6befde2d7f..7a83868cb8a 100644 --- a/pkg/kubelet/config/config.go +++ b/pkg/kubelet/config/config.go @@ -17,6 +17,7 @@ limitations under the License. package config import ( + "context" "fmt" "reflect" "sync" @@ -81,11 +82,11 @@ func NewPodConfig(mode PodConfigNotificationMode, recorder record.EventRecorder) // Channel creates or returns a config source channel. The channel // only accepts PodUpdates -func (c *PodConfig) Channel(source string) chan<- interface{} { +func (c *PodConfig) Channel(ctx context.Context, source string) chan<- interface{} { c.sourcesLock.Lock() defer c.sourcesLock.Unlock() c.sources.Insert(source) - return c.mux.Channel(source) + return c.mux.ChannelWithContext(ctx, source) } // SeenAllSources returns true if seenSources contains all sources in the diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 1b8acdfef50..cc1cdc53077 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -17,6 +17,7 @@ limitations under the License. package config import ( + "context" "math/rand" "reflect" "sort" @@ -85,10 +86,10 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod) return kubetypes.PodUpdate{Pods: pods, Op: op, Source: source} } -func createPodConfigTester(mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { +func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { eventBroadcaster := record.NewBroadcaster() config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"})) - channel := config.Channel(TestSource) + channel := config.Channel(ctx, TestSource) ch := config.Updates() return channel, ch, config } @@ -129,7 +130,10 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) { } func TestNewPodAdded(t *testing.T) { - channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) // see an update podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) @@ -141,7 +145,10 @@ func TestNewPodAdded(t *testing.T) { } func TestNewPodAddedInvalidNamespace(t *testing.T) { - channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) // see an update podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "")) @@ -153,7 +160,10 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) { } func TestNewPodAddedDefaultNamespace(t *testing.T) { - channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) // see an update podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")) @@ -165,7 +175,10 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) { } func TestNewPodAddedDifferentNamespaces(t *testing.T) { - channel, ch, config := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) // see an update podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "default")) @@ -182,7 +195,10 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) { } func TestInvalidPodFiltered(t *testing.T) { - channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) // see an update podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) @@ -196,7 +212,10 @@ func TestInvalidPodFiltered(t *testing.T) { } func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { - channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshotAndUpdates) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates) // see an set podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) @@ -214,7 +233,10 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { } func TestNewPodAddedSnapshot(t *testing.T) { - channel, ch, config := createPodConfigTester(PodConfigNotificationSnapshot) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot) // see an set podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) @@ -232,7 +254,10 @@ func TestNewPodAddedSnapshot(t *testing.T) { } func TestNewPodAddedUpdatedRemoved(t *testing.T) { - channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) // should register an add podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new")) @@ -255,7 +280,10 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { } func TestNewPodAddedDelete(t *testing.T) { - channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) // should register an add addedPod := CreateValidPod("foo", "new") @@ -274,7 +302,10 @@ func TestNewPodAddedDelete(t *testing.T) { } func TestNewPodAddedUpdatedSet(t *testing.T) { - channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) // should register an add podUpdate := CreatePodUpdate(kubetypes.ADD, TestSource, CreateValidPod("foo", "new"), CreateValidPod("foo2", "new"), CreateValidPod("foo3", "new")) @@ -296,6 +327,9 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { } func TestNewPodAddedSetReconciled(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + // Create and touch new test pods, return the new pods and touched pod. We should create new pod list // before touching to avoid data race. newTestPods := func(touchStatus, touchSpec bool) ([]*v1.Pod, *v1.Pod) { @@ -318,7 +352,7 @@ func TestNewPodAddedSetReconciled(t *testing.T) { } { var podWithStatusChange *v1.Pod pods, _ := newTestPods(false, false) - channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) // Use SET to initialize the config, especially initialize the source set channel <- CreatePodUpdate(kubetypes.SET, TestSource, pods...) @@ -341,6 +375,9 @@ func TestNewPodAddedSetReconciled(t *testing.T) { } func TestInitialEmptySet(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for _, test := range []struct { mode PodConfigNotificationMode op kubetypes.PodOperation @@ -349,7 +386,7 @@ func TestInitialEmptySet(t *testing.T) { {PodConfigNotificationSnapshot, kubetypes.SET}, {PodConfigNotificationSnapshotAndUpdates, kubetypes.SET}, } { - channel, ch, _ := createPodConfigTester(test.mode) + channel, ch, _ := createPodConfigTester(ctx, test.mode) // should register an empty PodUpdate operation podUpdate := CreatePodUpdate(kubetypes.SET, TestSource) @@ -366,7 +403,10 @@ func TestInitialEmptySet(t *testing.T) { } func TestPodUpdateAnnotations(t *testing.T) { - channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) pod := CreateValidPod("foo2", "new") pod.Annotations = make(map[string]string) @@ -395,7 +435,10 @@ func TestPodUpdateAnnotations(t *testing.T) { } func TestPodUpdateLabels(t *testing.T) { - channel, ch, _ := createPodConfigTester(PodConfigNotificationIncremental) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) pod := CreateValidPod("foo2", "new") pod.Labels = make(map[string]string) diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index 0e6f5f946de..443b57595b2 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -272,21 +272,24 @@ func makePodSourceConfig(kubeCfg *kubeletconfiginternal.KubeletConfiguration, ku // source of all configuration cfg := config.NewPodConfig(config.PodConfigNotificationIncremental, kubeDeps.Recorder) + // TODO: it needs to be replaced by a proper context in the future + ctx := context.TODO() + // define file config source if kubeCfg.StaticPodPath != "" { klog.InfoS("Adding static pod path", "path", kubeCfg.StaticPodPath) - config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(kubetypes.FileSource)) + config.NewSourceFile(kubeCfg.StaticPodPath, nodeName, kubeCfg.FileCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.FileSource)) } // define url config source if kubeCfg.StaticPodURL != "" { klog.InfoS("Adding pod URL with HTTP header", "URL", kubeCfg.StaticPodURL, "header", manifestURLHeader) - config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(kubetypes.HTTPSource)) + config.NewSourceURL(kubeCfg.StaticPodURL, manifestURLHeader, nodeName, kubeCfg.HTTPCheckFrequency.Duration, cfg.Channel(ctx, kubetypes.HTTPSource)) } if kubeDeps.KubeClient != nil { klog.InfoS("Adding apiserver pod source") - config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(kubetypes.ApiserverSource)) + config.NewSourceApiserver(kubeDeps.KubeClient, nodeName, nodeHasSynced, cfg.Channel(ctx, kubetypes.ApiserverSource)) } return cfg, nil } diff --git a/pkg/util/config/config.go b/pkg/util/config/config.go index c789daa2283..66fce13866d 100644 --- a/pkg/util/config/config.go +++ b/pkg/util/config/config.go @@ -17,6 +17,7 @@ limitations under the License. package config import ( + "context" "sync" "k8s.io/apimachinery/pkg/util/wait" @@ -57,12 +58,12 @@ func NewMux(merger Merger) *Mux { return mux } -// Channel returns a channel where a configuration source +// ChannelWithContext returns a channel where a configuration source // can send updates of new configurations. Multiple calls with the same // source will return the same channel. This allows change and state based sources // to use the same channel. Different source names however will be treated as a // union. -func (m *Mux) Channel(source string) chan interface{} { +func (m *Mux) ChannelWithContext(ctx context.Context, source string) chan interface{} { if len(source) == 0 { panic("Channel given an empty name") } @@ -74,7 +75,8 @@ func (m *Mux) Channel(source string) chan interface{} { } newChannel := make(chan interface{}) m.sources[source] = newChannel - go wait.Until(func() { m.listen(source, newChannel) }, 0, wait.NeverStop) + + go wait.Until(func() { m.listen(source, newChannel) }, 0, ctx.Done()) return newChannel } diff --git a/pkg/util/config/config_test.go b/pkg/util/config/config_test.go index 667288b9d0a..060e4ec4c98 100644 --- a/pkg/util/config/config_test.go +++ b/pkg/util/config/config_test.go @@ -17,17 +17,21 @@ limitations under the License. package config import ( + "context" "reflect" "testing" ) func TestConfigurationChannels(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + mux := NewMux(nil) - channelOne := mux.Channel("one") - if channelOne != mux.Channel("one") { + channelOne := mux.ChannelWithContext(ctx, "one") + if channelOne != mux.ChannelWithContext(ctx, "one") { t.Error("Didn't get the same muxuration channel back with the same name") } - channelTwo := mux.Channel("two") + channelTwo := mux.ChannelWithContext(ctx, "two") if channelOne == channelTwo { t.Error("Got back the same muxuration channel for different names") } @@ -50,12 +54,18 @@ func (m MergeMock) Merge(source string, update interface{}) error { } func TestMergeInvoked(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + merger := MergeMock{"one", "test", t} mux := NewMux(&merger) - mux.Channel("one") <- "test" + mux.ChannelWithContext(ctx, "one") <- "test" } func TestMergeFuncInvoked(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ch := make(chan bool) mux := NewMux(MergeFunc(func(source string, update interface{}) error { if source != "one" { @@ -67,11 +77,14 @@ func TestMergeFuncInvoked(t *testing.T) { ch <- true return nil })) - mux.Channel("one") <- "test" + mux.ChannelWithContext(ctx, "one") <- "test" <-ch } func TestSimultaneousMerge(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ch := make(chan bool, 2) mux := NewMux(MergeFunc(func(source string, update interface{}) error { switch source { @@ -89,8 +102,8 @@ func TestSimultaneousMerge(t *testing.T) { ch <- true return nil })) - source := mux.Channel("one") - source2 := mux.Channel("two") + source := mux.ChannelWithContext(ctx, "one") + source2 := mux.ChannelWithContext(ctx, "two") source <- "test" source2 <- "test2" <-ch