diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 76e8cdf7b99..aba3cd00256 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -524,11 +524,11 @@ func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error { } // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise. -func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { +func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { if kubeDeps.Recorder != nil { return } - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) eventBroadcaster.StartStructuredLogging(3) if kubeDeps.EventClient != nil { @@ -738,7 +738,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend } // Setup event recorder if required. - makeEventRecorder(kubeDeps, nodeName) + makeEventRecorder(ctx, kubeDeps, nodeName) if kubeDeps.ContainerManager == nil { if s.CgroupsPerQOS && s.CgroupRoot == "" { @@ -1206,7 +1206,7 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie } hostnameOverridden := len(kubeServer.HostnameOverride) > 0 // Setup event recorder if required. - makeEventRecorder(kubeDeps, nodeName) + makeEventRecorder(context.TODO(), kubeDeps, nodeName) nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs)) if err != nil { diff --git a/pkg/kubelet/config/config_test.go b/pkg/kubelet/config/config_test.go index 6137e291311..a0704816a8f 100644 --- a/pkg/kubelet/config/config_test.go +++ b/pkg/kubelet/config/config_test.go @@ -26,7 +26,7 @@ import ( "testing" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" @@ -35,6 +35,7 @@ import ( "k8s.io/client-go/tools/record" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/test/utils/ktesting" ) const ( @@ -93,7 +94,7 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod) } func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) channel := config.Channel(ctx, TestSource) ch := config.Updates() @@ -136,7 +137,8 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) { } func TestNewPodAdded(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -151,7 +153,8 @@ func TestNewPodAdded(t *testing.T) { } func TestNewPodAddedInvalidNamespace(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -166,7 +169,8 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) { } func TestNewPodAddedDefaultNamespace(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -181,7 +185,8 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) { } func TestNewPodAddedDifferentNamespaces(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -201,7 +206,8 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) { } func TestInvalidPodFiltered(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -218,7 +224,8 @@ func TestInvalidPodFiltered(t *testing.T) { } func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates) @@ -239,7 +246,8 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { } func TestNewPodAddedSnapshot(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot) @@ -260,7 +268,8 @@ func TestNewPodAddedSnapshot(t *testing.T) { } func TestNewPodAddedUpdatedRemoved(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -286,7 +295,8 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) { } func TestNewPodAddedDelete(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -308,7 +318,8 @@ func TestNewPodAddedDelete(t *testing.T) { } func TestNewPodAddedUpdatedSet(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -333,7 +344,8 @@ func TestNewPodAddedUpdatedSet(t *testing.T) { } func TestNewPodAddedSetReconciled(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() // Create and touch new test pods, return the new pods and touched pod. We should create new pod list @@ -381,7 +393,8 @@ func TestNewPodAddedSetReconciled(t *testing.T) { } func TestInitialEmptySet(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() for _, test := range []struct { @@ -409,7 +422,8 @@ func TestInitialEmptySet(t *testing.T) { } func TestPodUpdateAnnotations(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -441,7 +455,8 @@ func TestPodUpdateAnnotations(t *testing.T) { } func TestPodUpdateLabels(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) @@ -464,7 +479,11 @@ func TestPodUpdateLabels(t *testing.T) { } func TestPodConfigRace(t *testing.T) { - eventBroadcaster := record.NewBroadcaster() + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) seenSources := sets.NewString(TestSource) var wg sync.WaitGroup @@ -472,7 +491,7 @@ func TestPodConfigRace(t *testing.T) { wg.Add(2) go func() { - ctx, cancel := context.WithCancel(context.Background()) + ctx, cancel := context.WithCancel(ctx) defer cancel() defer wg.Done() for i := 0; i < iterations; i++ {