From d11511b1e8121ef03584917c1b70686853d72bd7 Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 1 Dec 2023 09:00:59 +0100 Subject: [PATCH] kubelet: enhance context support 27a68aee3a4834 introduced context support for events. Creating an event broadcaster with context makes tests more resilient against leaking goroutines when that context gets canceled at the end of a test and enables per-test output via ktesting. To use this in kubelet, a more thorough code update is needed. For now, context.TODO serves as a reminder that this is necessary. --- cmd/kubelet/app/server.go | 8 ++--- pkg/kubelet/config/config_test.go | 55 +++++++++++++++++++++---------- 2 files changed, 41 insertions(+), 22 deletions(-) diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 76e8cdf7b99..aba3cd00256 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -524,11 +524,11 @@ func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error { } // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise. -func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { +func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { if kubeDeps.Recorder != nil { return } - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) eventBroadcaster.StartStructuredLogging(3) if kubeDeps.EventClient != nil { @@ -738,7 +738,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend } // Setup event recorder if required. - makeEventRecorder(kubeDeps, nodeName) + makeEventRecorder(ctx, kubeDeps, nodeName) if kubeDeps.ContainerManager == nil { if s.CgroupsPerQOS && s.CgroupRoot == "" { @@ -1206,7 +1206,7 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie } hostnameOverridden := len(kubeServer.HostnameOverride) > 0 // Setup event recorder if required. - makeEventRecorder(kubeDeps, nodeName) + makeEventRecorder(context.TODO(), kubeDeps, nodeName) nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs)) if err != nil { diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 6137e291311..a0704816a8f 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -93,7 +94,7 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod) } func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) channel := config.Channel(ctx, TestSource) ch := config.Updates() @@ -136,7 +137,8 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) { } func TestNewPodAdded(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -151,7 +153,8 @@ func TestNewPodAdded(t *testing.T) { } func TestNewPodAddedInvalidNamespace(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -166,7 +169,8 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) { } func TestNewPodAddedDefaultNamespace(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -181,7 +185,8 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) { } func TestNewPodAddedDifferentNamespaces(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -201,7 +206,8 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) { } func TestInvalidPodFiltered(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -218,7 +224,8 @@ func TestInvalidPodFiltered(t *testing.T) { } func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates) @@ -239,7 +246,8 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { } func TestNewPodAddedSnapshot(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot) @@ -260,7 +268,8 @@ func TestNewPodAddedSnapshot(t *testing.T) { } func TestNewPodAddedUpdatedRemoved(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -286,7 +295,8 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { } func TestNewPodAddedDelete(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -308,7 +318,8 @@ func TestNewPodAddedDelete(t *testing.T) { } func TestNewPodAddedUpdatedSet(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -333,7 +344,8 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { } func TestNewPodAddedSetReconciled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Create and touch new test pods, return the new pods and touched pod. We should create new pod list @@ -381,7 +393,8 @@ func TestNewPodAddedSetReconciled(t *testing.T) { } func TestInitialEmptySet(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() for _, test := range []struct { @@ -409,7 +422,8 @@ func TestInitialEmptySet(t *testing.T) { } func TestPodUpdateAnnotations(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -441,7 +455,8 @@ func TestPodUpdateAnnotations(t *testing.T) { } func TestPodUpdateLabels(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -464,7 +479,11 @@ func TestPodUpdateLabels(t *testing.T) { } func TestPodConfigRace(t *testing.T) { - eventBroadcaster := record.NewBroadcaster() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) seenSources := sets.NewString(TestSource) var wg sync.WaitGroup @@ -472,7 +491,7 @@ func TestPodConfigRace(t *testing.T) { wg.Add(2) go func() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() defer wg.Done() for i := 0; i < iterations; i++ {