kubelet: enhance context support

27a68aee3a introduced context support for events. Creating an event
broadcaster with context makes tests more resilient against leaking goroutines
when that context gets canceled at the end of a test and enables per-test
output via ktesting.

To use this in kubelet, a more thorough code update is needed. For now,
context.TODO serves as a reminder that this is necessary.
This commit is contained in:
Patrick Ohly 2023-12-01 09:00:59 +01:00
parent 46f4248d56
commit d11511b1e8
2 changed files with 41 additions and 22 deletions

View File

@ -524,11 +524,11 @@ func initConfigz(kc *kubeletconfiginternal.KubeletConfiguration) error {
} }
// makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise. // makeEventRecorder sets up kubeDeps.Recorder if it's nil. It's a no-op otherwise.
func makeEventRecorder(kubeDeps *kubelet.Dependencies, nodeName types.NodeName) { func makeEventRecorder(ctx context.Context, kubeDeps *kubelet.Dependencies, nodeName types.NodeName) {
if kubeDeps.Recorder != nil { if kubeDeps.Recorder != nil {
return return
} }
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)}) kubeDeps.Recorder = eventBroadcaster.NewRecorder(legacyscheme.Scheme, v1.EventSource{Component: componentKubelet, Host: string(nodeName)})
eventBroadcaster.StartStructuredLogging(3) eventBroadcaster.StartStructuredLogging(3)
if kubeDeps.EventClient != nil { if kubeDeps.EventClient != nil {
@ -738,7 +738,7 @@ func run(ctx context.Context, s *options.KubeletServer, kubeDeps *kubelet.Depend
} }
// Setup event recorder if required. // Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName) makeEventRecorder(ctx, kubeDeps, nodeName)
if kubeDeps.ContainerManager == nil { if kubeDeps.ContainerManager == nil {
if s.CgroupsPerQOS && s.CgroupRoot == "" { if s.CgroupsPerQOS && s.CgroupRoot == "" {
@ -1206,7 +1206,7 @@ func RunKubelet(kubeServer *options.KubeletServer, kubeDeps *kubelet.Dependencie
} }
hostnameOverridden := len(kubeServer.HostnameOverride) > 0 hostnameOverridden := len(kubeServer.HostnameOverride) > 0
// Setup event recorder if required. // Setup event recorder if required.
makeEventRecorder(kubeDeps, nodeName) makeEventRecorder(context.TODO(), kubeDeps, nodeName)
nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs)) nodeIPs, err := nodeutil.ParseNodeIPArgument(kubeServer.NodeIP, kubeServer.CloudProvider, utilfeature.DefaultFeatureGate.Enabled(features.CloudDualStackNodeIPs))
if err != nil { if err != nil {

View File

@ -26,7 +26,7 @@ import (
"testing" "testing"
"time" "time"
"k8s.io/api/core/v1" v1 "k8s.io/api/core/v1"
apiequality "k8s.io/apimachinery/pkg/api/equality" apiequality "k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/types"
@ -35,6 +35,7 @@ import (
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types" kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/securitycontext" "k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/test/utils/ktesting"
) )
const ( const (
@ -93,7 +94,7 @@ func CreatePodUpdate(op kubetypes.PodOperation, source string, pods ...*v1.Pod)
} }
func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) { func createPodConfigTester(ctx context.Context, mode PodConfigNotificationMode) (chan<- interface{}, <-chan kubetypes.PodUpdate, *PodConfig) {
eventBroadcaster := record.NewBroadcaster() eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) config := NewPodConfig(mode, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
channel := config.Channel(ctx, TestSource) channel := config.Channel(ctx, TestSource)
ch := config.Updates() ch := config.Updates()
@ -136,7 +137,8 @@ func expectNoPodUpdate(t *testing.T, ch <-chan kubetypes.PodUpdate) {
} }
func TestNewPodAdded(t *testing.T) { func TestNewPodAdded(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -151,7 +153,8 @@ func TestNewPodAdded(t *testing.T) {
} }
func TestNewPodAddedInvalidNamespace(t *testing.T) { func TestNewPodAddedInvalidNamespace(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -166,7 +169,8 @@ func TestNewPodAddedInvalidNamespace(t *testing.T) {
} }
func TestNewPodAddedDefaultNamespace(t *testing.T) { func TestNewPodAddedDefaultNamespace(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -181,7 +185,8 @@ func TestNewPodAddedDefaultNamespace(t *testing.T) {
} }
func TestNewPodAddedDifferentNamespaces(t *testing.T) { func TestNewPodAddedDifferentNamespaces(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -201,7 +206,8 @@ func TestNewPodAddedDifferentNamespaces(t *testing.T) {
} }
func TestInvalidPodFiltered(t *testing.T) { func TestInvalidPodFiltered(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -218,7 +224,8 @@ func TestInvalidPodFiltered(t *testing.T) {
} }
func TestNewPodAddedSnapshotAndUpdates(t *testing.T) { func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates) channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshotAndUpdates)
@ -239,7 +246,8 @@ func TestNewPodAddedSnapshotAndUpdates(t *testing.T) {
} }
func TestNewPodAddedSnapshot(t *testing.T) { func TestNewPodAddedSnapshot(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot) channel, ch, config := createPodConfigTester(ctx, PodConfigNotificationSnapshot)
@ -260,7 +268,8 @@ func TestNewPodAddedSnapshot(t *testing.T) {
} }
func TestNewPodAddedUpdatedRemoved(t *testing.T) { func TestNewPodAddedUpdatedRemoved(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -286,7 +295,8 @@ func TestNewPodAddedUpdatedRemoved(t *testing.T) {
} }
func TestNewPodAddedDelete(t *testing.T) { func TestNewPodAddedDelete(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -308,7 +318,8 @@ func TestNewPodAddedDelete(t *testing.T) {
} }
func TestNewPodAddedUpdatedSet(t *testing.T) { func TestNewPodAddedUpdatedSet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -333,7 +344,8 @@ func TestNewPodAddedUpdatedSet(t *testing.T) {
} }
func TestNewPodAddedSetReconciled(t *testing.T) { func TestNewPodAddedSetReconciled(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
// Create and touch new test pods, return the new pods and touched pod. We should create new pod list // Create and touch new test pods, return the new pods and touched pod. We should create new pod list
@ -381,7 +393,8 @@ func TestNewPodAddedSetReconciled(t *testing.T) {
} }
func TestInitialEmptySet(t *testing.T) { func TestInitialEmptySet(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
for _, test := range []struct { for _, test := range []struct {
@ -409,7 +422,8 @@ func TestInitialEmptySet(t *testing.T) {
} }
func TestPodUpdateAnnotations(t *testing.T) { func TestPodUpdateAnnotations(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -441,7 +455,8 @@ func TestPodUpdateAnnotations(t *testing.T) {
} }
func TestPodUpdateLabels(t *testing.T) { func TestPodUpdateLabels(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background()) _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental) channel, ch, _ := createPodConfigTester(ctx, PodConfigNotificationIncremental)
@ -464,7 +479,11 @@ func TestPodUpdateLabels(t *testing.T) {
} }
func TestPodConfigRace(t *testing.T) { func TestPodConfigRace(t *testing.T) {
eventBroadcaster := record.NewBroadcaster() _, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{}) config := NewPodConfig(PodConfigNotificationIncremental, eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "kubelet"}), &mockPodStartupSLIObserver{})
seenSources := sets.NewString(TestSource) seenSources := sets.NewString(TestSource)
var wg sync.WaitGroup var wg sync.WaitGroup
@ -472,7 +491,7 @@ func TestPodConfigRace(t *testing.T) {
wg.Add(2) wg.Add(2)
go func() { go func() {
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
defer wg.Done() defer wg.Done()
for i := 0; i < iterations; i++ { for i := 0; i < iterations; i++ {