diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go index 45cbd16429b..07c8d23abd4 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller.go @@ -120,9 +120,6 @@ func NewCloudNodeController( nodeStatusUpdateFrequency time.Duration, workerCount int32) (*CloudNodeController, error) { - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}) - _, instancesSupported := cloud.Instances() _, instancesV2Supported := cloud.InstancesV2() if !instancesSupported && !instancesV2Supported { @@ -132,8 +129,6 @@ func NewCloudNodeController( cnc := &CloudNodeController{ nodeInformer: nodeInformer, kubeClient: kubeClient, - broadcaster: eventBroadcaster, - recorder: recorder, cloud: cloud, nodeStatusUpdateFrequency: nodeStatusUpdateFrequency, workerCount: workerCount, @@ -156,7 +151,21 @@ func NewCloudNodeController( // This controller updates newly registered nodes with information // from the cloud provider. This call is blocking so should be called // via a goroutine +// +//logcheck:context // RunWithContext should be used instead of Run in code which supports contextual logging. func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + cnc.RunWithContext(wait.ContextForChannel(stopCh), controllerManagerMetrics) +} + +// RunWithContext will sync informer caches and starting workers. +// This controller updates newly registered nodes with information +// from the cloud provider. This call is blocking so should be called +// via a goroutine +func (cnc *CloudNodeController) RunWithContext(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + cnc.broadcaster = record.NewBroadcaster(record.WithContext(ctx)) + cnc.recorder = cnc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"}) + stopCh := ctx.Done() + defer utilruntime.HandleCrash() defer cnc.workqueue.ShutDown() @@ -178,16 +187,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet // The periodic loop for updateNodeStatus polls the Cloud Provider periodically // to reconcile the nodes addresses and labels. - go wait.Until(func() { - if err := cnc.UpdateNodeStatus(context.TODO()); err != nil { + go wait.UntilWithContext(ctx, func(ctx context.Context) { + if err := cnc.UpdateNodeStatus(ctx); err != nil { klog.Errorf("failed to update node status: %v", err) } - }, cnc.nodeStatusUpdateFrequency, stopCh) + }, cnc.nodeStatusUpdateFrequency) // These workers initialize the nodes added to the cluster, // those that are Tainted with TaintExternalCloudProvider. for i := int32(0); i < cnc.workerCount; i++ { - go wait.Until(cnc.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, cnc.runWorker, time.Second) } <-stopCh @@ -196,14 +205,14 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet // runWorker is a long-running function that will continually call the // processNextWorkItem function in order to read and process a message on the // workqueue. -func (cnc *CloudNodeController) runWorker() { - for cnc.processNextWorkItem() { +func (cnc *CloudNodeController) runWorker(ctx context.Context) { + for cnc.processNextWorkItem(ctx) { } } // processNextWorkItem will read a single work item off the workqueue and // attempt to process it, by calling the syncHandler. -func (cnc *CloudNodeController) processNextWorkItem() bool { +func (cnc *CloudNodeController) processNextWorkItem(ctx context.Context) bool { obj, shutdown := cnc.workqueue.Get() if shutdown { return false @@ -223,7 +232,7 @@ func (cnc *CloudNodeController) processNextWorkItem() bool { // Run the syncHandler, passing it the key of the // Node resource to be synced. - if err := cnc.syncHandler(key); err != nil { + if err := cnc.syncHandler(ctx, key); err != nil { // Put the item back on the workqueue to handle any transient errors. cnc.workqueue.AddRateLimited(key) klog.Infof("error syncing '%s': %v, requeuing", key, err) @@ -245,14 +254,14 @@ func (cnc *CloudNodeController) processNextWorkItem() bool { } // syncHandler implements the logic of the controller. -func (cnc *CloudNodeController) syncHandler(key string) error { +func (cnc *CloudNodeController) syncHandler(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key)) return nil } - return cnc.syncNode(context.TODO(), name) + return cnc.syncNode(ctx, name) } // UpdateNodeStatus updates the node status, such as node addresses @@ -456,7 +465,7 @@ func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) e modify(newNode) } - _, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{}) + _, err = cnc.kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{}) if err != nil { return err } diff --git a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go index 7a1294f9552..20165bcccad 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/node/node_controller_test.go @@ -28,6 +28,7 @@ import ( "k8s.io/client-go/kubernetes/fake" "k8s.io/client-go/kubernetes/scheme" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" @@ -1777,10 +1778,14 @@ func Test_syncNode(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + clientset := fake.NewSimpleClientset(test.existingNode) factory := informers.NewSharedInformerFactory(clientset, 0) - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) cloudNodeController := &CloudNodeController{ kubeClient: clientset, nodeInformer: factory.Core().V1().Nodes(), @@ -1799,12 +1804,12 @@ func Test_syncNode(t *testing.T) { w := eventBroadcaster.StartLogging(klog.Infof) defer w.Stop() - err := cloudNodeController.syncNode(context.TODO(), test.existingNode.Name) + err := cloudNodeController.syncNode(ctx, test.existingNode.Name) if (err != nil) != test.expectedErr { t.Fatalf("error got: %v expected: %v", err, test.expectedErr) } - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, test.existingNode.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated nodes: %v", err) } @@ -1884,6 +1889,11 @@ func Test_reconcileNodeLabels(t *testing.T) { for _, test := range testcases { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + stopCh := ctx.Done() + testNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node01", @@ -1899,9 +1909,6 @@ func Test_reconcileNodeLabels(t *testing.T) { nodeInformer: factory.Core().V1().Nodes(), } - stopCh := make(chan struct{}) - defer close(stopCh) - // activate node informer factory.Core().V1().Nodes().Informer() factory.Start(stopCh) @@ -1914,7 +1921,7 @@ func Test_reconcileNodeLabels(t *testing.T) { t.Errorf("unexpected error") } - actualNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), "node01", metav1.GetOptions{}) + actualNode, err := clientset.CoreV1().Nodes().Get(ctx, "node01", metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated node: %v", err) } @@ -2062,6 +2069,10 @@ func TestNodeAddressesChangeDetected(t *testing.T) { // Test updateNodeAddress with instanceV2, same test case with TestNodeAddressesNotUpdate. func TestNodeAddressesNotUpdateV2(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + existingNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node0", @@ -2119,13 +2130,13 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) { cloud: fakeCloud, } - instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode) + instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode) if err != nil { t.Errorf("get instance metadata with error %v", err) } - cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta) + cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta) - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated nodes: %v", err) } @@ -2138,6 +2149,10 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) { // This test checks that a node with the external cloud provider taint is cloudprovider initialized and // and node addresses will not be updated when node isn't present according to the cloudprovider func TestNodeAddressesNotUpdate(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + existingNode := &v1.Node{ ObjectMeta: metav1.ObjectMeta{ Name: "node0", @@ -2195,13 +2210,13 @@ func TestNodeAddressesNotUpdate(t *testing.T) { cloud: fakeCloud, } - instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode) + instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode) if err != nil { t.Errorf("get instance metadata with error %v", err) } - cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta) + cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta) - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{}) if err != nil { t.Fatalf("error getting updated nodes: %v", err) } @@ -2514,11 +2529,13 @@ func TestGetInstanceMetadata(t *testing.T) { for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + cloudNodeController := &CloudNodeController{ cloud: test.fakeCloud, } - metadata, err := cloudNodeController.getInstanceMetadata(context.TODO(), test.existingNode) + metadata, err := cloudNodeController.getInstanceMetadata(ctx, test.existingNode) if (err != nil) != test.expectErr { t.Fatalf("error expected %v got: %v", test.expectErr, err) } @@ -2574,6 +2591,10 @@ func TestUpdateNodeStatus(t *testing.T) { } for _, test := range tests { t.Run(test.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + fakeCloud := &fakecloud.Cloud{ EnableInstancesV2: false, Addresses: []v1.NodeAddress{ @@ -2600,7 +2621,7 @@ func TestUpdateNodeStatus(t *testing.T) { }) factory := informers.NewSharedInformerFactory(clientset, 0) - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) nodeInformer := factory.Core().V1().Nodes() nodeIndexer := nodeInformer.Informer().GetIndexer() cloudNodeController := &CloudNodeController{ @@ -2621,11 +2642,13 @@ func TestUpdateNodeStatus(t *testing.T) { } } - w := eventBroadcaster.StartLogging(klog.Infof) + w := eventBroadcaster.StartStructuredLogging(0) defer w.Stop() start := time.Now() - cloudNodeController.UpdateNodeStatus(context.TODO()) + if err := cloudNodeController.UpdateNodeStatus(ctx); err != nil { + t.Fatalf("error updating node status: %v", err) + } t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start)) if len(fakeCloud.Calls) != test.nodes { t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls)) diff --git a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go index 0df8a4a07d7..2c1e63dae39 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller.go @@ -21,7 +21,7 @@ import ( "errors" "time" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" @@ -74,9 +74,6 @@ func NewCloudNodeLifecycleController( cloud cloudprovider.Interface, nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) { - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}) - if kubeClient == nil { return nil, errors.New("kubernetes client is nil") } @@ -94,8 +91,6 @@ func NewCloudNodeLifecycleController( c := &CloudNodeLifecycleController{ kubeClient: kubeClient, nodeLister: nodeInformer.Lister(), - broadcaster: eventBroadcaster, - recorder: recorder, cloud: cloud, nodeMonitorPeriod: nodeMonitorPeriod, } @@ -106,6 +101,9 @@ func NewCloudNodeLifecycleController( // Run starts the main loop for this controller. Run is blocking so should // be called via a goroutine func (c *CloudNodeLifecycleController) Run(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + c.broadcaster = record.NewBroadcaster(record.WithContext(ctx)) + c.recorder = c.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"}) + defer utilruntime.HandleCrash() controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle") defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle") diff --git a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go index b46a24436ad..1b21e201d09 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/nodelifecycle/node_lifecycle_controller_test.go @@ -19,11 +19,12 @@ package cloud import ( "context" "errors" - "github.com/google/go-cmp/cmp" "reflect" "testing" "time" + "github.com/google/go-cmp/cmp" + v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -36,6 +37,7 @@ import ( cloudprovider "k8s.io/cloud-provider" fakecloud "k8s.io/cloud-provider/fake" "k8s.io/klog/v2" + "k8s.io/klog/v2/ktesting" ) func Test_NodesDeleted(t *testing.T) { @@ -510,17 +512,18 @@ func Test_NodesDeleted(t *testing.T) { for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() clientset := fake.NewSimpleClientset(testcase.existingNode) informer := informers.NewSharedInformerFactory(clientset, time.Second) nodeInformer := informer.Core().V1().Nodes() - if err := syncNodeStore(nodeInformer, clientset); err != nil { + if err := syncNodeStore(ctx, nodeInformer, clientset); err != nil { t.Errorf("unexpected error: %v", err) } - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) cloudNodeLifecycleController := &CloudNodeLifecycleController{ nodeLister: nodeInformer.Lister(), kubeClient: clientset, @@ -885,15 +888,19 @@ func Test_NodesShutdown(t *testing.T) { for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + clientset := fake.NewSimpleClientset(testcase.existingNode) informer := informers.NewSharedInformerFactory(clientset, time.Second) nodeInformer := informer.Core().V1().Nodes() - if err := syncNodeStore(nodeInformer, clientset); err != nil { + if err := syncNodeStore(ctx, nodeInformer, clientset); err != nil { t.Errorf("unexpected error: %v", err) } - eventBroadcaster := record.NewBroadcaster() + eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx)) cloudNodeLifecycleController := &CloudNodeLifecycleController{ nodeLister: nodeInformer.Lister(), kubeClient: clientset, @@ -902,11 +909,11 @@ func Test_NodesShutdown(t *testing.T) { nodeMonitorPeriod: 1 * time.Second, } - w := eventBroadcaster.StartLogging(klog.Infof) + w := eventBroadcaster.StartStructuredLogging(0) defer w.Stop() - cloudNodeLifecycleController.MonitorNodes(context.TODO()) + cloudNodeLifecycleController.MonitorNodes(ctx) - updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), testcase.existingNode.Name, metav1.GetOptions{}) + updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, testcase.existingNode.Name, metav1.GetOptions{}) if testcase.expectedDeleted != apierrors.IsNotFound(err) { t.Fatalf("unexpected error happens when getting the node: %v", err) } @@ -1047,11 +1054,13 @@ func Test_GetProviderID(t *testing.T) { for _, testcase := range testcases { t.Run(testcase.name, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + cloudNodeLifecycleController := &CloudNodeLifecycleController{ cloud: testcase.fakeCloud, } - providerID, err := cloudNodeLifecycleController.getProviderID(context.TODO(), testcase.existingNode) + providerID, err := cloudNodeLifecycleController.getProviderID(ctx, testcase.existingNode) if err != nil && testcase.expectedErr == nil { t.Fatalf("unexpected error: %v", err) @@ -1070,8 +1079,8 @@ func Test_GetProviderID(t *testing.T) { } } -func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error { - nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) +func syncNodeStore(ctx context.Context, nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error { + nodes, err := f.CoreV1().Nodes().List(ctx, metav1.ListOptions{}) if err != nil { return err } diff --git a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go index 072e860b0f9..d6e5ab3b800 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller.go @@ -74,9 +74,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform klog.Fatal("RouteController: Must specify clusterCIDR.") } - eventBroadcaster := record.NewBroadcaster() - recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"}) - rc := &RouteController{ routes: routes, kubeClient: kubeClient, @@ -84,8 +81,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform clusterCIDRs: clusterCIDRs, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - broadcaster: eventBroadcaster, - recorder: recorder, } return rc @@ -94,6 +89,9 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { defer utilruntime.HandleCrash() + rc.broadcaster = record.NewBroadcaster(record.WithContext(ctx)) + rc.recorder = rc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"}) + // Start event processing pipeline. if rc.broadcaster != nil { rc.broadcaster.StartStructuredLogging(0) diff --git a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go index 03bf423a6e5..01e6852863b 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/route/route_controller_test.go @@ -31,6 +31,7 @@ import ( cloudprovider "k8s.io/cloud-provider" fakecloud "k8s.io/cloud-provider/fake" nodeutil "k8s.io/component-helpers/node/util" + "k8s.io/klog/v2/ktesting" netutils "k8s.io/utils/net" "github.com/stretchr/testify/assert" @@ -415,7 +416,8 @@ func TestReconcile(t *testing.T) { } for _, testCase := range testCases { t.Run(testCase.description, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() cloud := &fakecloud.Cloud{RouteMap: make(map[string]*fakecloud.Route)} for _, route := range testCase.initialRoutes { diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index f731e0c7a86..dde3c36c5dd 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -109,17 +109,12 @@ func New( clusterName string, featureGate featuregate.FeatureGate, ) (*Controller, error) { - broadcaster := record.NewBroadcaster() - recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) - registerMetrics() s := &Controller{ cloud: cloud, kubeClient: kubeClient, clusterName: clusterName, cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, - eventBroadcaster: broadcaster, - eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), @@ -219,6 +214,9 @@ func (c *Controller) enqueueNode(obj interface{}) { // It's an error to call Run() more than once for a given ServiceController // object. func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { + c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx)) + c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) + defer runtime.HandleCrash() defer c.serviceQueue.ShutDown() defer c.nodeQueue.ShutDown() diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 80ac218683c..fa30e389467 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -51,6 +51,7 @@ import ( fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" _ "k8s.io/controller-manager/pkg/features/register" + "k8s.io/klog/v2/ktesting" utilpointer "k8s.io/utils/pointer" ) @@ -150,7 +151,8 @@ func defaultExternalService() *v1.Service { // node/service informers and reacting to resource events. Callers can also // specify `objects` which represent the initial state of objects, used to // populate the client set / informer cache at start-up. -func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) { +func newController(ctx context.Context, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) { + stopCh := ctx.Done() cloud := &fakecloud.Cloud{} cloud.Region = region @@ -158,7 +160,7 @@ func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controll informerFactory := informers.NewSharedInformerFactory(kubeClient, 0) serviceInformer := informerFactory.Core().V1().Services() nodeInformer := informerFactory.Core().V1().Nodes() - broadcaster := record.NewBroadcaster() + broadcaster := record.NewBroadcaster(record.WithContext(ctx)) broadcaster.StartStructuredLogging(0) broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")}) recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"}) @@ -275,9 +277,10 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - controller, cloud, client := newController(nil) + controller, cloud, client := newController(ctx) cloud.Exists = tc.lbExists key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name) if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil { @@ -321,7 +324,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { } for _, balancer := range cloud.Balancers { - if balancer.Name != controller.balancer.GetLoadBalancerName(context.Background(), "", tc.service) || + if balancer.Name != controller.balancer.GetLoadBalancerName(ctx, "", tc.service) || balancer.Region != region || balancer.Ports[0].Port != tc.service.Spec.Ports[0].Port { t.Errorf("Created load balancer has incorrect parameters: %v", balancer) @@ -449,9 +452,10 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { }} for _, item := range table { t.Run(item.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - controller, cloud, _ := newController(nil) + controller, cloud, _ := newController(ctx) controller.nodeLister = newFakeNodeLister(nil, nodes...) if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) @@ -607,11 +611,12 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, }} { t.Run(tc.desc, func(t *testing.T) { - controller, cloud, _ := newController(nil) - - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() + controller, cloud, _ := newController(ctx) + for _, svc := range services { key, _ := cache.MetaNamespaceKeyFunc(svc) controller.lastSyncedNodes[key] = tc.initialState @@ -638,6 +643,10 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { } func TestNodeChangesInExternalLoadBalancer(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + node1 := makeNode(tweakName("node1")) node2 := makeNode(tweakName("node2")) node3 := makeNode(tweakName("node3")) @@ -655,7 +664,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName())) } - controller, cloud, _ := newController(nil) + controller, cloud, _ := newController(ctx) for _, tc := range []struct { desc string nodes []*v1.Node @@ -715,7 +724,8 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) { expectedRetryServices: serviceNames, }} { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() controller.nodeLister = newFakeNodeLister(tc.nodeListerErr, tc.nodes...) servicesToRetry := controller.updateLoadBalancerHosts(ctx, services, tc.worker) @@ -771,7 +781,11 @@ func compareHostSets(t *testing.T, left, right []*v1.Node) bool { } func TestNodesNotEqual(t *testing.T) { - controller, cloud, _ := newController(nil) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + controller, cloud, _ := newController(ctx) services := []*v1.Service{ newService("s0", v1.ServiceTypeLoadBalancer), @@ -818,7 +832,8 @@ func TestNodesNotEqual(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) @@ -835,7 +850,11 @@ func TestNodesNotEqual(t *testing.T) { } func TestProcessServiceCreateOrUpdate(t *testing.T) { - controller, _, client := newController(nil) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + controller, _, client := newController(ctx) //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" @@ -908,7 +927,8 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) { }} for _, tc := range testCases { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() newSvc := tc.updateFn(tc.svc) if _, err := client.CoreV1().Services(tc.svc.Namespace).Create(ctx, tc.svc, metav1.CreateOptions{}); err != nil { @@ -945,12 +965,13 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() svc := newService(svcName, v1.ServiceTypeLoadBalancer) // Preset finalizer so k8s error only happens when patching status. svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer} - controller, _, client := newController(nil) + controller, _, client := newController(ctx) client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) { return true, nil, tc.k8sErr }) @@ -987,14 +1008,14 @@ func TestSyncService(t *testing.T) { testCases := []struct { testName string key string - updateFn func() //Function to manipulate the controller element to simulate error - expectedFn func(error) error //Expected function if returns nil then test passed, failed otherwise + updateFn func(context.Context) // Function to manipulate the controller element to simulate error + expectedFn func(error) error // Expected function if returns nil then test passed, failed otherwise }{ { testName: "if an invalid service name is synced", key: "invalid/key/string", - updateFn: func() { - controller, _, _ = newController(nil) + updateFn: func(ctx context.Context) { + controller, _, _ = newController(ctx) }, expectedFn: func(e error) error { //TODO: should find a way to test for dependent package errors in such a way that it won't break @@ -1024,9 +1045,9 @@ func TestSyncService(t *testing.T) { { testName: "if valid service", key: "external-balancer", - updateFn: func() { + updateFn: func(ctx context.Context) { testSvc := defaultExternalService() - controller, _, _ = newController(nil) + controller, _, _ = newController(ctx) controller.enqueueService(testSvc) svc := controller.cache.getOrCreate("external-balancer") svc.state = testSvc @@ -1043,10 +1064,11 @@ func TestSyncService(t *testing.T) { for _, tc := range testCases { t.Run(tc.testName, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() - tc.updateFn() + tc.updateFn(ctx) obtainedErr := controller.syncService(ctx, tc.key) //expected matches obtained ??. @@ -1128,11 +1150,12 @@ func TestProcessServiceDeletion(t *testing.T) { }} for _, tc := range testCases { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() //Create a new controller. - controller, cloud, _ = newController(nil) + controller, cloud, _ = newController(ctx) tc.updateFn(controller) obtainedErr := controller.processServiceDeletion(ctx, svcKey) if err := tc.expectedFn(obtainedErr); err != nil { @@ -1213,8 +1236,10 @@ func TestNeedsCleanup(t *testing.T) { // make sure that the slow node sync never removes the Node from LB set because it // has stale data. func TestSlowNodeSync(t *testing.T) { - stopCh, syncServiceDone, syncService := make(chan struct{}), make(chan string), make(chan string) - defer close(stopCh) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + syncServiceDone, syncService := make(chan string), make(chan string) defer close(syncService) node1 := makeNode(tweakName("node1")) @@ -1227,7 +1252,7 @@ func TestSlowNodeSync(t *testing.T) { sKey2, _ := cache.MetaNamespaceKeyFunc(service2) serviceKeys := sets.New(sKey1, sKey2) - controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2) + controller, cloudProvider, kubeClient := newController(ctx, node1, node2, service1, service2) cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) { key, _ := cache.MetaNamespaceKeyFunc(update.Service) impactedService := serviceKeys.Difference(sets.New(key)).UnsortedList()[0] @@ -1263,17 +1288,17 @@ func TestSlowNodeSync(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - controller.syncNodes(context.TODO(), 1) + controller.syncNodes(ctx, 1) }() key := <-syncService - if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil { + if _, err := kubeClient.CoreV1().Nodes().Create(ctx, node3, metav1.CreateOptions{}); err != nil { t.Fatalf("error creating node3, err: %v", err) } // Allow a bit of time for the informer cache to get populated with the new // node - if err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 10*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { + if err := wait.PollUntilContextCancel(ctx, 10*time.Millisecond, true, func(ctx context.Context) (done bool, err error) { n3, _ := controller.nodeLister.Get("node3") return n3 != nil, nil }); err != nil { @@ -1281,7 +1306,7 @@ func TestSlowNodeSync(t *testing.T) { } // Sync the service - if err := controller.syncService(context.TODO(), key); err != nil { + if err := controller.syncService(ctx, key); err != nil { t.Fatalf("unexpected service sync error, err: %v", err) } @@ -1465,13 +1490,18 @@ func TestNeedsUpdate(t *testing.T) { expectedNeedsUpdate: true, }} - controller, _, _ := newController(nil) for _, tc := range testCases { - oldSvc, newSvc := tc.updateFn() - obtainedResult := controller.needsUpdate(oldSvc, newSvc) - if obtainedResult != tc.expectedNeedsUpdate { - t.Errorf("%v needsUpdate() should have returned %v but returned %v", tc.testName, tc.expectedNeedsUpdate, obtainedResult) - } + t.Run(tc.testName, func(t *testing.T) { + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + controller, _, _ := newController(ctx) + oldSvc, newSvc := tc.updateFn() + obtainedResult := controller.needsUpdate(oldSvc, newSvc) + if obtainedResult != tc.expectedNeedsUpdate { + t.Errorf("%v needsUpdate() should have returned %v but returned %v", tc.testName, tc.expectedNeedsUpdate, obtainedResult) + } + }) } } @@ -1606,7 +1636,8 @@ func TestAddFinalizer(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() c := fake.NewSimpleClientset() s := &Controller{ @@ -1659,7 +1690,8 @@ func TestRemoveFinalizer(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() c := fake.NewSimpleClientset() s := &Controller{ @@ -1756,7 +1788,8 @@ func TestPatchStatus(t *testing.T) { for _, tc := range testCases { t.Run(tc.desc, func(t *testing.T) { - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() c := fake.NewSimpleClientset() s := &Controller{ @@ -2277,7 +2310,10 @@ func TestServiceQueueDelay(t *testing.T) { for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { - controller, cloud, client := newController(nil) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + controller, cloud, client := newController(ctx) queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")} controller.serviceQueue = queue cloud.Err = tc.lbCloudErr @@ -2290,7 +2326,6 @@ func TestServiceQueueDelay(t *testing.T) { t.Fatalf("adding service %s to cache: %s", svc.Name, err) } - ctx := context.Background() _, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{}) if err != nil { t.Fatal(err) diff --git a/staging/src/k8s.io/cloud-provider/options/options.go b/staging/src/k8s.io/cloud-provider/options/options.go index 15be5a86c07..b96948106ca 100644 --- a/staging/src/k8s.io/cloud-provider/options/options.go +++ b/staging/src/k8s.io/cloud-provider/options/options.go @@ -17,6 +17,7 @@ limitations under the License. package options import ( + "context" "fmt" "math/rand" "net" @@ -221,7 +222,7 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, allControllers return err } - c.EventBroadcaster = record.NewBroadcaster() + c.EventBroadcaster = record.NewBroadcaster(record.WithContext(context.TODO())) // TODO: move broadcaster construction to a place where there is a proper context. c.EventRecorder = c.EventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent}) rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{