From 50c12437604b0cd5a73514389409fc2fde8b91bd Mon Sep 17 00:00:00 2001 From: Patrick Ohly Date: Fri, 1 Dec 2023 09:00:59 +0100 Subject: [PATCH] cloud providers: enhance context support 27a68aee3a4834 introduced context support for events. Creating an event broadcaster with context makes tests more resilient against leaking goroutines when that context gets canceled at the end of a test and enables per-test output via ktesting. While at it, all context.TODO calls get removed in files that were touched. --- .../controllers/node/node_controller.go | 41 +++--- .../controllers/node/node_controller_test.go | 57 +++++--- .../node_lifecycle_controller.go | 10 +- .../node_lifecycle_controller_test.go | 33 +++-- .../controllers/route/route_controller.go | 8 +- .../route/route_controller_test.go | 4 +- .../controllers/service/controller.go | 8 +- .../controllers/service/controller_test.go | 127 +++++++++++------- .../k8s.io/cloud-provider/options/options.go | 3 +- 9 files changed, 182 insertions(+), 109 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go index 45cbd16429b..07c8d23abd4 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go @@ -120,9 +120,6 @@ func NewCloudNodeController( nodeStatusUpdateFrequency time.Duration, workerCount int32) (*CloudNodeController, error) { - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}) - _, instancesSupported := cloud.Instances() _, instancesV2Supported := cloud.InstancesV2() if !instancesSupported && !instancesV2Supported { @@ -132,8 +129,6 @@ func NewCloudNodeController( cnc := &CloudNodeController{ nodeInformer: nodeInformer, kubeClient: kubeClient, - broadcaster: eventBroadcaster, - recorder: recorder, cloud: cloud, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, workerCount: workerCount, @@ -156,7 +151,21 @@ func NewCloudNodeController( // This controller updates newly registered nodes with information // from the cloud provider. This call is blocking so should be called // via a goroutine +// +//logcheck:context // RunWithContext should be used instead of Run in code which supports contextual logging. func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + cnc.RunWithContext(wait.ContextForChannel(stopCh), controllerManagerMetrics) +} + +// RunWithContext will sync informer caches and starting workers. +// This controller updates newly registered nodes with information +// from the cloud provider. This call is blocking so should be called +// via a goroutine +func (cnc *CloudNodeController) RunWithContext(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + cnc.broadcaster = record.NewBroadcaster(record.WithContext(ctx)) + cnc.recorder = cnc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}) + stopCh := ctx.Done() + defer utilruntime.HandleCrash() defer cnc.workqueue.ShutDown() @@ -178,16 +187,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet // The periodic loop for updateNodeStatus polls the Cloud Provider periodically // to reconcile the nodes addresses and labels. - go wait.Until(func() { - if err := cnc.UpdateNodeStatus(context.TODO()); err != nil { + go wait.UntilWithContext(ctx, func(ctx context.Context) { + if err := cnc.UpdateNodeStatus(ctx); err != nil { klog.Errorf("failed to update node status: %v", err) } - }, cnc.nodeStatusUpdateFrequency, stopCh) + }, cnc.nodeStatusUpdateFrequency) // These workers initialize the nodes added to the cluster, // those that are Tainted with TaintExternalCloudProvider. for i := int32(0); i < cnc.workerCount; i++ { - go wait.Until(cnc.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, cnc.runWorker, time.Second) } <-stopCh @@ -196,14 +205,14 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. -func (cnc *CloudNodeController) runWorker() { - for cnc.processNextWorkItem() { +func (cnc *CloudNodeController) runWorker(ctx context.Context) { + for cnc.processNextWorkItem(ctx) { } } // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. -func (cnc *CloudNodeController) processNextWorkItem() bool { +func (cnc *CloudNodeController) processNextWorkItem(ctx context.Context) bool { obj, shutdown := cnc.workqueue.Get() if shutdown { return false @@ -223,7 +232,7 @@ func (cnc *CloudNodeController) processNextWorkItem() bool { // Run the syncHandler, passing it the key of the // Node resource to be synced. - if err := cnc.syncHandler(key); err != nil { + if err := cnc.syncHandler(ctx, key); err != nil { // Put the item back on the workqueue to handle any transient errors. cnc.workqueue.AddRateLimited(key) klog.Infof("error syncing '%s': %v, requeuing", key, err) @@ -245,14 +254,14 @@ func (cnc *CloudNodeController) processNextWorkItem() bool { } // syncHandler implements the logic of the controller. -func (cnc *CloudNodeController) syncHandler(key string) error { +func (cnc *CloudNodeController) syncHandler(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } - return cnc.syncNode(context.TODO(), name) + return cnc.syncNode(ctx, name) } // UpdateNodeStatus updates the node status, such as node addresses @@ -456,7 +465,7 @@ func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) e modify(newNode) } - _, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{}) + _, err = cnc.kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{}) if err != nil { return err } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go index 7a1294f9552..20165bcccad 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -1777,10 +1778,14 @@ func Test_syncNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + clientset := fake.NewSimpleClientset(test.existingNode) factory := informers.NewSharedInformerFactory(clientset, 0) - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) cloudNodeController := &CloudNodeController{ kubeClient: clientset, nodeInformer: factory.Core().V1().Nodes(), @@ -1799,12 +1804,12 @@ func Test_syncNode(t *testing.T) { w := eventBroadcaster.StartLogging(klog.Infof) defer w.Stop() - err := cloudNodeController.syncNode(context.TODO(), test.existingNode.Name) + err := cloudNodeController.syncNode(ctx, test.existingNode.Name) if (err != nil) != test.expectedErr { t.Fatalf("error got: %v expected: %v", err, test.expectedErr) } - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, test.existingNode.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated nodes: %v", err) } @@ -1884,6 +1889,11 @@ func Test_reconcileNodeLabels(t *testing.T) { for _, test := range testcases { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + stopCh := ctx.Done() + testNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node01", @@ -1899,9 +1909,6 @@ func Test_reconcileNodeLabels(t *testing.T) { nodeInformer: factory.Core().V1().Nodes(), } - stopCh := make(chan struct{}) - defer close(stopCh) - // activate node informer factory.Core().V1().Nodes().Informer() factory.Start(stopCh) @@ -1914,7 +1921,7 @@ func Test_reconcileNodeLabels(t *testing.T) { t.Errorf("unexpected error") } - actualNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), "node01", metav1.GetOptions{}) + actualNode, err := clientset.CoreV1().Nodes().Get(ctx, "node01", metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated node: %v", err) } @@ -2062,6 +2069,10 @@ func TestNodeAddressesChangeDetected(t *testing.T) { // Test updateNodeAddress with instanceV2, same test case with TestNodeAddressesNotUpdate. func TestNodeAddressesNotUpdateV2(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + existingNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node0", @@ -2119,13 +2130,13 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) { cloud: fakeCloud, } - instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode) + instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode) if err != nil { t.Errorf("get instance metadata with error %v", err) } - cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta) + cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta) - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated nodes: %v", err) } @@ -2138,6 +2149,10 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) { // This test checks that a node with the external cloud provider taint is cloudprovider initialized and // and node addresses will not be updated when node isn't present according to the cloudprovider func TestNodeAddressesNotUpdate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + existingNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node0", @@ -2195,13 +2210,13 @@ func TestNodeAddressesNotUpdate(t *testing.T) { cloud: fakeCloud, } - instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode) + instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode) if err != nil { t.Errorf("get instance metadata with error %v", err) } - cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta) + cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta) - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated nodes: %v", err) } @@ -2514,11 +2529,13 @@ func TestGetInstanceMetadata(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + cloudNodeController := &CloudNodeController{ cloud: test.fakeCloud, } - metadata, err := cloudNodeController.getInstanceMetadata(context.TODO(), test.existingNode) + metadata, err := cloudNodeController.getInstanceMetadata(ctx, test.existingNode) if (err != nil) != test.expectErr { t.Fatalf("error expected %v got: %v", test.expectErr, err) } @@ -2574,6 +2591,10 @@ func TestUpdateNodeStatus(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + fakeCloud := &fakecloud.Cloud{ EnableInstancesV2: false, Addresses: []v1.NodeAddress{ @@ -2600,7 +2621,7 @@ func TestUpdateNodeStatus(t *testing.T) { }) factory := informers.NewSharedInformerFactory(clientset, 0) - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) nodeInformer := factory.Core().V1().Nodes() nodeIndexer := nodeInformer.Informer().GetIndexer() cloudNodeController := &CloudNodeController{ @@ -2621,11 +2642,13 @@ func TestUpdateNodeStatus(t *testing.T) { } } - w := eventBroadcaster.StartLogging(klog.Infof) + w := eventBroadcaster.StartStructuredLogging(0) defer w.Stop() start := time.Now() - cloudNodeController.UpdateNodeStatus(context.TODO()) + if err := cloudNodeController.UpdateNodeStatus(ctx); err != nil { + t.Fatalf("error updating node status: %v", err) + } t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start)) if len(fakeCloud.Calls) != test.nodes { t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls)) diff --git a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go index 0df8a4a07d7..2c1e63dae39 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go @@ -21,7 +21,7 @@ import ( "errors" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -74,9 +74,6 @@ func NewCloudNodeLifecycleController( cloud cloudprovider.Interface, nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) { - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}) - if kubeClient == nil { return nil, errors.New("kubernetes client is nil") } @@ -94,8 +91,6 @@ func NewCloudNodeLifecycleController( c := &CloudNodeLifecycleController{ kubeClient: kubeClient, nodeLister: nodeInformer.Lister(), - broadcaster: eventBroadcaster, - recorder: recorder, cloud: cloud, nodeMonitorPeriod: nodeMonitorPeriod, } @@ -106,6 +101,9 @@ func NewCloudNodeLifecycleController( // Run starts the main loop for this controller. Run is blocking so should // be called via a goroutine func (c *CloudNodeLifecycleController) Run(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + c.broadcaster = record.NewBroadcaster(record.WithContext(ctx)) + c.recorder = c.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}) + defer utilruntime.HandleCrash() controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle") defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle") diff --git a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go index b46a24436ad..1b21e201d09 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go @@ -19,11 +19,12 @@ package cloud import ( "context" "errors" - "github.com/google/go-cmp/cmp" "reflect" "testing" "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,6 +37,7 @@ import ( cloudprovider "k8s.io/cloud-provider" fakecloud "k8s.io/cloud-provider/fake" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" ) func Test_NodesDeleted(t *testing.T) { @@ -510,17 +512,18 @@ func Test_NodesDeleted(t *testing.T) { for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() clientset := fake.NewSimpleClientset(testcase.existingNode) informer := informers.NewSharedInformerFactory(clientset, time.Second) nodeInformer := informer.Core().V1().Nodes() - if err := syncNodeStore(nodeInformer, clientset); err != nil { + if err := syncNodeStore(ctx, nodeInformer, clientset); err != nil { t.Errorf("unexpected error: %v", err) } - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) cloudNodeLifecycleController := &CloudNodeLifecycleController{ nodeLister: nodeInformer.Lister(), kubeClient: clientset, @@ -885,15 +888,19 @@ func Test_NodesShutdown(t *testing.T) { for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + clientset := fake.NewSimpleClientset(testcase.existingNode) informer := informers.NewSharedInformerFactory(clientset, time.Second) nodeInformer := informer.Core().V1().Nodes() - if err := syncNodeStore(nodeInformer, clientset); err != nil { + if err := syncNodeStore(ctx, nodeInformer, clientset); err != nil { t.Errorf("unexpected error: %v", err) } - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) cloudNodeLifecycleController := &CloudNodeLifecycleController{ nodeLister: nodeInformer.Lister(), kubeClient: clientset, @@ -902,11 +909,11 @@ func Test_NodesShutdown(t *testing.T) { nodeMonitorPeriod: 1 * time.Second, } - w := eventBroadcaster.StartLogging(klog.Infof) + w := eventBroadcaster.StartStructuredLogging(0) defer w.Stop() - cloudNodeLifecycleController.MonitorNodes(context.TODO()) + cloudNodeLifecycleController.MonitorNodes(ctx) - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), testcase.existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, testcase.existingNode.Name, metav1.GetOptions{}) if testcase.expectedDeleted != apierrors.IsNotFound(err) { t.Fatalf("unexpected error happens when getting the node: %v", err) } @@ -1047,11 +1054,13 @@ func Test_GetProviderID(t *testing.T) { for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + cloudNodeLifecycleController := &CloudNodeLifecycleController{ cloud: testcase.fakeCloud, } - providerID, err := cloudNodeLifecycleController.getProviderID(context.TODO(), testcase.existingNode) + providerID, err := cloudNodeLifecycleController.getProviderID(ctx, testcase.existingNode) if err != nil && testcase.expectedErr == nil { t.Fatalf("unexpected error: %v", err) @@ -1070,8 +1079,8 @@ func Test_GetProviderID(t *testing.T) { } } -func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error { - nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) +func syncNodeStore(ctx context.Context, nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error { + nodes, err := f.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return err } diff --git a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go index 072e860b0f9..d6e5ab3b800 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go @@ -74,9 +74,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform klog.Fatal("RouteController: Must specify clusterCIDR.") } - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"}) - rc := &RouteController{ routes: routes, kubeClient: kubeClient, @@ -84,8 +81,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform clusterCIDRs: clusterCIDRs, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - broadcaster: eventBroadcaster, - recorder: recorder, } return rc @@ -94,6 +89,9 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { defer utilruntime.HandleCrash() + rc.broadcaster = record.NewBroadcaster(record.WithContext(ctx)) + rc.recorder = rc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"}) + // Start event processing pipeline. if rc.broadcaster != nil { rc.broadcaster.StartStructuredLogging(0) diff --git a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go index 03bf423a6e5..01e6852863b 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go @@ -31,6 +31,7 @@ import ( cloudprovider "k8s.io/cloud-provider" fakecloud "k8s.io/cloud-provider/fake" nodeutil "k8s.io/component-helpers/node/util" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" "github.com/stretchr/testify/assert" @@ -415,7 +416,8 @@ func TestReconcile(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() cloud := &fakecloud.Cloud{RouteMap: make(map[string]*fakecloud.Route)} for _, route := range testCase.initialRoutes { diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index f731e0c7a86..dde3c36c5dd 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -109,17 +109,12 @@ func New( clusterName string, featureGate featuregate.FeatureGate, ) (*Controller, error) { - broadcaster := record.NewBroadcaster() - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) - registerMetrics() s := &Controller{ cloud: cloud, kubeClient: kubeClient, clusterName: clusterName, cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, - eventBroadcaster: broadcaster, - eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), @@ -219,6 +214,9 @@ func (c *Controller) enqueueNode(obj interface{}) { // It's an error to call Run() more than once for a given ServiceController // object. func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx)) + c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) + defer runtime.HandleCrash() defer c.serviceQueue.ShutDown() defer c.nodeQueue.ShutDown() diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 80ac218683c..fa30e389467 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -51,6 +51,7 @@ import ( fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" _ "k8s.io/controller-manager/pkg/features/register" + "k8s.io/klog/v2/ktesting" utilpointer "k8s.io/utils/pointer" ) @@ -150,7 +151,8 @@ func defaultExternalService() *v1.Service { // node/service informers and reacting to resource events. Callers can also // specify `objects` which represent the initial state of objects, used to // populate the client set / informer cache at start-up. -func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) { +func newController(ctx context.Context, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) { + stopCh := ctx.Done() cloud := &fakecloud.Cloud{} cloud.Region = region @@ -158,7 +160,7 @@ func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controll informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) serviceInformer := informerFactory.Core().V1().Services() nodeInformer := informerFactory.Core().V1().Nodes() - broadcaster := record.NewBroadcaster() + broadcaster := record.NewBroadcaster(record.WithContext(ctx)) broadcaster.StartStructuredLogging(0) broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) @@ -275,9 +277,10 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - controller, cloud, client := newController(nil) + controller, cloud, client := newController(ctx) cloud.Exists = tc.lbExists key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name) if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil { @@ -321,7 +324,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { } for _, balancer := range cloud.Balancers { - if balancer.Name != controller.balancer.GetLoadBalancerName(context.Background(), "", tc.service) || + if balancer.Name != controller.balancer.GetLoadBalancerName(ctx, "", tc.service) || balancer.Region != region || balancer.Ports[0].Port != tc.service.Spec.Ports[0].Port { t.Errorf("Created load balancer has incorrect parameters: %v", balancer) @@ -449,9 +452,10 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { }} for _, item := range table { t.Run(item.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - controller, cloud, _ := newController(nil) + controller, cloud, _ := newController(ctx) controller.nodeLister = newFakeNodeLister(nil, nodes...) if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) @@ -607,11 +611,12 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController(nil) - - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() + controller, cloud, _ := newController(ctx) + for _, svc := range services { key, _ := cache.MetaNamespaceKeyFunc(svc) controller.lastSyncedNodes[key] = tc.initialState @@ -638,6 +643,10 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { } func TestNodeChangesInExternalLoadBalancer(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + node1 := makeNode(tweakName("node1")) node2 := makeNode(tweakName("node2")) node3 := makeNode(tweakName("node3")) @@ -655,7 +664,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName())) } - controller, cloud, _ := newController(nil) + controller, cloud, _ := newController(ctx) for _, tc := range []struct { desc string nodes []*v1.Node @@ -715,7 +724,8 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { expectedRetryServices: serviceNames, }} { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() controller.nodeLister = newFakeNodeLister(tc.nodeListerErr, tc.nodes...) servicesToRetry := controller.updateLoadBalancerHosts(ctx, services, tc.worker) @@ -771,7 +781,11 @@ func compareHostSets(t *testing.T, left, right []*v1.Node) bool { } func TestNodesNotEqual(t *testing.T) { - controller, cloud, _ := newController(nil) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + controller, cloud, _ := newController(ctx) services := []*v1.Service{ newService("s0", v1.ServiceTypeLoadBalancer), @@ -818,7 +832,8 @@ func TestNodesNotEqual(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) @@ -835,7 +850,11 @@ func TestNodesNotEqual(t *testing.T) { } func TestProcessServiceCreateOrUpdate(t *testing.T) { - controller, _, client := newController(nil) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + controller, _, client := newController(ctx) //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" @@ -908,7 +927,8 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) { }} for _, tc := range testCases { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() newSvc := tc.updateFn(tc.svc) if _, err := client.CoreV1().Services(tc.svc.Namespace).Create(ctx, tc.svc, metav1.CreateOptions{}); err != nil { @@ -945,12 +965,13 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() svc := newService(svcName, v1.ServiceTypeLoadBalancer) // Preset finalizer so k8s error only happens when patching status. svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer} - controller, _, client := newController(nil) + controller, _, client := newController(ctx) client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.k8sErr }) @@ -987,14 +1008,14 @@ func TestSyncService(t *testing.T) { testCases := []struct { testName string key string - updateFn func() //Function to manipulate the controller element to simulate error - expectedFn func(error) error //Expected function if returns nil then test passed, failed otherwise + updateFn func(context.Context) // Function to manipulate the controller element to simulate error + expectedFn func(error) error // Expected function if returns nil then test passed, failed otherwise }{ { testName: "if an invalid service name is synced", key: "invalid/key/string", - updateFn: func() { - controller, _, _ = newController(nil) + updateFn: func(ctx context.Context) { + controller, _, _ = newController(ctx) }, expectedFn: func(e error) error { //TODO: should find a way to test for dependent package errors in such a way that it won't break @@ -1024,9 +1045,9 @@ func TestSyncService(t *testing.T) { { testName: "if valid service", key: "external-balancer", - updateFn: func() { + updateFn: func(ctx context.Context) { testSvc := defaultExternalService() - controller, _, _ = newController(nil) + controller, _, _ = newController(ctx) controller.enqueueService(testSvc) svc := controller.cache.getOrCreate("external-balancer") svc.state = testSvc @@ -1043,10 +1064,11 @@ func TestSyncService(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - tc.updateFn() + tc.updateFn(ctx) obtainedErr := controller.syncService(ctx, tc.key) //expected matches obtained ??. @@ -1128,11 +1150,12 @@ func TestProcessServiceDeletion(t *testing.T) { }} for _, tc := range testCases { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() //Create a new controller. - controller, cloud, _ = newController(nil) + controller, cloud, _ = newController(ctx) tc.updateFn(controller) obtainedErr := controller.processServiceDeletion(ctx, svcKey) if err := tc.expectedFn(obtainedErr); err != nil { @@ -1213,8 +1236,10 @@ func TestNeedsCleanup(t *testing.T) { // make sure that the slow node sync never removes the Node from LB set because it // has stale data. func TestSlowNodeSync(t *testing.T) { - stopCh, syncServiceDone, syncService := make(chan struct{}), make(chan string), make(chan string) - defer close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + syncServiceDone, syncService := make(chan string), make(chan string) defer close(syncService) node1 := makeNode(tweakName("node1")) @@ -1227,7 +1252,7 @@ func TestSlowNodeSync(t *testing.T) { sKey2, _ := cache.MetaNamespaceKeyFunc(service2) serviceKeys := sets.New(sKey1, sKey2) - controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2) + controller, cloudProvider, kubeClient := newController(ctx, node1, node2, service1, service2) cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) { key, _ := cache.MetaNamespaceKeyFunc(update.Service) impactedService := serviceKeys.Difference(sets.New(key)).UnsortedList()[0] @@ -1263,17 +1288,17 @@ func TestSlowNodeSync(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.syncNodes(context.TODO(), 1) + controller.syncNodes(ctx, 1) }() key := <-syncService - if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil { + if _, err := kubeClient.CoreV1().Nodes().Create(ctx, node3, metav1.CreateOptions{}); err != nil { t.Fatalf("error creating node3, err: %v", err) } // Allow a bit of time for the informer cache to get populated with the new // node - if err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 10*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + if err := wait.PollUntilContextCancel(ctx, 10*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { n3, _ := controller.nodeLister.Get("node3") return n3 != nil, nil }); err != nil { @@ -1281,7 +1306,7 @@ func TestSlowNodeSync(t *testing.T) { } // Sync the service - if err := controller.syncService(context.TODO(), key); err != nil { + if err := controller.syncService(ctx, key); err != nil { t.Fatalf("unexpected service sync error, err: %v", err) } @@ -1465,13 +1490,18 @@ func TestNeedsUpdate(t *testing.T) { expectedNeedsUpdate: true, }} - controller, _, _ := newController(nil) for _, tc := range testCases { - oldSvc, newSvc := tc.updateFn() - obtainedResult := controller.needsUpdate(oldSvc, newSvc) - if obtainedResult != tc.expectedNeedsUpdate { - t.Errorf("%v needsUpdate() should have returned %v but returned %v", tc.testName, tc.expectedNeedsUpdate, obtainedResult) - } + t.Run(tc.testName, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + controller, _, _ := newController(ctx) + oldSvc, newSvc := tc.updateFn() + obtainedResult := controller.needsUpdate(oldSvc, newSvc) + if obtainedResult != tc.expectedNeedsUpdate { + t.Errorf("%v needsUpdate() should have returned %v but returned %v", tc.testName, tc.expectedNeedsUpdate, obtainedResult) + } + }) } } @@ -1606,7 +1636,8 @@ func TestAddFinalizer(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() c := fake.NewSimpleClientset() s := &Controller{ @@ -1659,7 +1690,8 @@ func TestRemoveFinalizer(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() c := fake.NewSimpleClientset() s := &Controller{ @@ -1756,7 +1788,8 @@ func TestPatchStatus(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() c := fake.NewSimpleClientset() s := &Controller{ @@ -2277,7 +2310,10 @@ func TestServiceQueueDelay(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - controller, cloud, client := newController(nil) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + controller, cloud, client := newController(ctx) queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} controller.serviceQueue = queue cloud.Err = tc.lbCloudErr @@ -2290,7 +2326,6 @@ func TestServiceQueueDelay(t *testing.T) { t.Fatalf("adding service %s to cache: %s", svc.Name, err) } - ctx := context.Background() _, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}) if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/cloud-provider/options/options.go b/staging/src/k8s.io/cloud-provider/options/options.go index 15be5a86c07..b96948106ca 100644 --- a/staging/src/k8s.io/cloud-provider/options/options.go +++ b/staging/src/k8s.io/cloud-provider/options/options.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "math/rand" "net" @@ -221,7 +222,7 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, allControllers return err } - c.EventBroadcaster = record.NewBroadcaster() + c.EventBroadcaster = record.NewBroadcaster(record.WithContext(context.TODO())) // TODO: move broadcaster construction to a place where there is a proper context. c.EventRecorder = c.EventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent}) rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{