cloud providers: enhance context support

27a68aee3a introduced context support for events. Creating an event
broadcaster with context makes tests more resilient against leaking goroutines
when that context gets canceled at the end of a test and enables per-test
output via ktesting.

While at it, all context.TODO calls get removed in files that were touched.
This commit is contained in:
Patrick Ohly 2023-12-01 09:00:59 +01:00
parent e19ff7771d
commit 50c1243760
9 changed files with 182 additions and 109 deletions

View File

@ -120,9 +120,6 @@ func NewCloudNodeController(
nodeStatusUpdateFrequency time.Duration,
workerCount int32) (*CloudNodeController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"})
_, instancesSupported := cloud.Instances()
_, instancesV2Supported := cloud.InstancesV2()
if !instancesSupported && !instancesV2Supported {
@ -132,8 +129,6 @@ func NewCloudNodeController(
cnc := &CloudNodeController{
nodeInformer: nodeInformer,
kubeClient: kubeClient,
broadcaster: eventBroadcaster,
recorder: recorder,
cloud: cloud,
nodeStatusUpdateFrequency: nodeStatusUpdateFrequency,
workerCount: workerCount,
@ -156,7 +151,21 @@ func NewCloudNodeController(
// This controller updates newly registered nodes with information
// from the cloud provider. This call is blocking so should be called
// via a goroutine
//
//logcheck:context // RunWithContext should be used instead of Run in code which supports contextual logging.
func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
cnc.RunWithContext(wait.ContextForChannel(stopCh), controllerManagerMetrics)
}
// RunWithContext will sync informer caches and starting workers.
// This controller updates newly registered nodes with information
// from the cloud provider. This call is blocking so should be called
// via a goroutine
func (cnc *CloudNodeController) RunWithContext(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
cnc.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
cnc.recorder = cnc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-controller"})
stopCh := ctx.Done()
defer utilruntime.HandleCrash()
defer cnc.workqueue.ShutDown()
@ -178,16 +187,16 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet
// The periodic loop for updateNodeStatus polls the Cloud Provider periodically
// to reconcile the nodes addresses and labels.
go wait.Until(func() {
if err := cnc.UpdateNodeStatus(context.TODO()); err != nil {
go wait.UntilWithContext(ctx, func(ctx context.Context) {
if err := cnc.UpdateNodeStatus(ctx); err != nil {
klog.Errorf("failed to update node status: %v", err)
}
}, cnc.nodeStatusUpdateFrequency, stopCh)
}, cnc.nodeStatusUpdateFrequency)
// These workers initialize the nodes added to the cluster,
// those that are Tainted with TaintExternalCloudProvider.
for i := int32(0); i < cnc.workerCount; i++ {
go wait.Until(cnc.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, cnc.runWorker, time.Second)
}
<-stopCh
@ -196,14 +205,14 @@ func (cnc *CloudNodeController) Run(stopCh <-chan struct{}, controllerManagerMet
// runWorker is a long-running function that will continually call the
// processNextWorkItem function in order to read and process a message on the
// workqueue.
func (cnc *CloudNodeController) runWorker() {
for cnc.processNextWorkItem() {
func (cnc *CloudNodeController) runWorker(ctx context.Context) {
for cnc.processNextWorkItem(ctx) {
}
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling the syncHandler.
func (cnc *CloudNodeController) processNextWorkItem() bool {
func (cnc *CloudNodeController) processNextWorkItem(ctx context.Context) bool {
obj, shutdown := cnc.workqueue.Get()
if shutdown {
return false
@ -223,7 +232,7 @@ func (cnc *CloudNodeController) processNextWorkItem() bool {
// Run the syncHandler, passing it the key of the
// Node resource to be synced.
if err := cnc.syncHandler(key); err != nil {
if err := cnc.syncHandler(ctx, key); err != nil {
// Put the item back on the workqueue to handle any transient errors.
cnc.workqueue.AddRateLimited(key)
klog.Infof("error syncing '%s': %v, requeuing", key, err)
@ -245,14 +254,14 @@ func (cnc *CloudNodeController) processNextWorkItem() bool {
}
// syncHandler implements the logic of the controller.
func (cnc *CloudNodeController) syncHandler(key string) error {
func (cnc *CloudNodeController) syncHandler(ctx context.Context, key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
utilruntime.HandleError(fmt.Errorf("invalid resource key: %s", key))
return nil
}
return cnc.syncNode(context.TODO(), name)
return cnc.syncNode(ctx, name)
}
// UpdateNodeStatus updates the node status, such as node addresses
@ -456,7 +465,7 @@ func (cnc *CloudNodeController) syncNode(ctx context.Context, nodeName string) e
modify(newNode)
}
_, err = cnc.kubeClient.CoreV1().Nodes().Update(context.TODO(), newNode, metav1.UpdateOptions{})
_, err = cnc.kubeClient.CoreV1().Nodes().Update(ctx, newNode, metav1.UpdateOptions{})
if err != nil {
return err
}

View File

@ -28,6 +28,7 @@ import (
"k8s.io/client-go/kubernetes/fake"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -1777,10 +1778,14 @@ func Test_syncNode(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
clientset := fake.NewSimpleClientset(test.existingNode)
factory := informers.NewSharedInformerFactory(clientset, 0)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
cloudNodeController := &CloudNodeController{
kubeClient: clientset,
nodeInformer: factory.Core().V1().Nodes(),
@ -1799,12 +1804,12 @@ func Test_syncNode(t *testing.T) {
w := eventBroadcaster.StartLogging(klog.Infof)
defer w.Stop()
err := cloudNodeController.syncNode(context.TODO(), test.existingNode.Name)
err := cloudNodeController.syncNode(ctx, test.existingNode.Name)
if (err != nil) != test.expectedErr {
t.Fatalf("error got: %v expected: %v", err, test.expectedErr)
}
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), test.existingNode.Name, metav1.GetOptions{})
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, test.existingNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated nodes: %v", err)
}
@ -1884,6 +1889,11 @@ func Test_reconcileNodeLabels(t *testing.T) {
for _, test := range testcases {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
stopCh := ctx.Done()
testNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node01",
@ -1899,9 +1909,6 @@ func Test_reconcileNodeLabels(t *testing.T) {
nodeInformer: factory.Core().V1().Nodes(),
}
stopCh := make(chan struct{})
defer close(stopCh)
// activate node informer
factory.Core().V1().Nodes().Informer()
factory.Start(stopCh)
@ -1914,7 +1921,7 @@ func Test_reconcileNodeLabels(t *testing.T) {
t.Errorf("unexpected error")
}
actualNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), "node01", metav1.GetOptions{})
actualNode, err := clientset.CoreV1().Nodes().Get(ctx, "node01", metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated node: %v", err)
}
@ -2062,6 +2069,10 @@ func TestNodeAddressesChangeDetected(t *testing.T) {
// Test updateNodeAddress with instanceV2, same test case with TestNodeAddressesNotUpdate.
func TestNodeAddressesNotUpdateV2(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
existingNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
@ -2119,13 +2130,13 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) {
cloud: fakeCloud,
}
instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode)
instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode)
if err != nil {
t.Errorf("get instance metadata with error %v", err)
}
cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta)
cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated nodes: %v", err)
}
@ -2138,6 +2149,10 @@ func TestNodeAddressesNotUpdateV2(t *testing.T) {
// This test checks that a node with the external cloud provider taint is cloudprovider initialized and
// and node addresses will not be updated when node isn't present according to the cloudprovider
func TestNodeAddressesNotUpdate(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
existingNode := &v1.Node{
ObjectMeta: metav1.ObjectMeta{
Name: "node0",
@ -2195,13 +2210,13 @@ func TestNodeAddressesNotUpdate(t *testing.T) {
cloud: fakeCloud,
}
instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(context.TODO(), existingNode)
instanceMeta, err := cloudNodeController.getInstanceNodeAddresses(ctx, existingNode)
if err != nil {
t.Errorf("get instance metadata with error %v", err)
}
cloudNodeController.updateNodeAddress(context.TODO(), existingNode, instanceMeta)
cloudNodeController.updateNodeAddress(ctx, existingNode, instanceMeta)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), existingNode.Name, metav1.GetOptions{})
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, existingNode.Name, metav1.GetOptions{})
if err != nil {
t.Fatalf("error getting updated nodes: %v", err)
}
@ -2514,11 +2529,13 @@ func TestGetInstanceMetadata(t *testing.T) {
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
cloudNodeController := &CloudNodeController{
cloud: test.fakeCloud,
}
metadata, err := cloudNodeController.getInstanceMetadata(context.TODO(), test.existingNode)
metadata, err := cloudNodeController.getInstanceMetadata(ctx, test.existingNode)
if (err != nil) != test.expectErr {
t.Fatalf("error expected %v got: %v", test.expectErr, err)
}
@ -2574,6 +2591,10 @@ func TestUpdateNodeStatus(t *testing.T) {
}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
fakeCloud := &fakecloud.Cloud{
EnableInstancesV2: false,
Addresses: []v1.NodeAddress{
@ -2600,7 +2621,7 @@ func TestUpdateNodeStatus(t *testing.T) {
})
factory := informers.NewSharedInformerFactory(clientset, 0)
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
nodeInformer := factory.Core().V1().Nodes()
nodeIndexer := nodeInformer.Informer().GetIndexer()
cloudNodeController := &CloudNodeController{
@ -2621,11 +2642,13 @@ func TestUpdateNodeStatus(t *testing.T) {
}
}
w := eventBroadcaster.StartLogging(klog.Infof)
w := eventBroadcaster.StartStructuredLogging(0)
defer w.Stop()
start := time.Now()
cloudNodeController.UpdateNodeStatus(context.TODO())
if err := cloudNodeController.UpdateNodeStatus(ctx); err != nil {
t.Fatalf("error updating node status: %v", err)
}
t.Logf("%d workers: processed %d nodes int %v ", test.workers, test.nodes, time.Since(start))
if len(fakeCloud.Calls) != test.nodes {
t.Errorf("expected %d cloud-provider calls, got %d", test.nodes, len(fakeCloud.Calls))

View File

@ -21,7 +21,7 @@ import (
"errors"
"time"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/types"
@ -74,9 +74,6 @@ func NewCloudNodeLifecycleController(
cloud cloudprovider.Interface,
nodeMonitorPeriod time.Duration) (*CloudNodeLifecycleController, error) {
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
if kubeClient == nil {
return nil, errors.New("kubernetes client is nil")
}
@ -94,8 +91,6 @@ func NewCloudNodeLifecycleController(
c := &CloudNodeLifecycleController{
kubeClient: kubeClient,
nodeLister: nodeInformer.Lister(),
broadcaster: eventBroadcaster,
recorder: recorder,
cloud: cloud,
nodeMonitorPeriod: nodeMonitorPeriod,
}
@ -106,6 +101,9 @@ func NewCloudNodeLifecycleController(
// Run starts the main loop for this controller. Run is blocking so should
// be called via a goroutine
func (c *CloudNodeLifecycleController) Run(ctx context.Context, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
c.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
c.recorder = c.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "cloud-node-lifecycle-controller"})
defer utilruntime.HandleCrash()
controllerManagerMetrics.ControllerStarted("cloud-node-lifecycle")
defer controllerManagerMetrics.ControllerStopped("cloud-node-lifecycle")

View File

@ -19,11 +19,12 @@ package cloud
import (
"context"
"errors"
"github.com/google/go-cmp/cmp"
"reflect"
"testing"
"time"
"github.com/google/go-cmp/cmp"
v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -36,6 +37,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
fakecloud "k8s.io/cloud-provider/fake"
"k8s.io/klog/v2"
"k8s.io/klog/v2/ktesting"
)
func Test_NodesDeleted(t *testing.T) {
@ -510,17 +512,18 @@ func Test_NodesDeleted(t *testing.T) {
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
clientset := fake.NewSimpleClientset(testcase.existingNode)
informer := informers.NewSharedInformerFactory(clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()
if err := syncNodeStore(nodeInformer, clientset); err != nil {
if err := syncNodeStore(ctx, nodeInformer, clientset); err != nil {
t.Errorf("unexpected error: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
cloudNodeLifecycleController := &CloudNodeLifecycleController{
nodeLister: nodeInformer.Lister(),
kubeClient: clientset,
@ -885,15 +888,19 @@ func Test_NodesShutdown(t *testing.T) {
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
clientset := fake.NewSimpleClientset(testcase.existingNode)
informer := informers.NewSharedInformerFactory(clientset, time.Second)
nodeInformer := informer.Core().V1().Nodes()
if err := syncNodeStore(nodeInformer, clientset); err != nil {
if err := syncNodeStore(ctx, nodeInformer, clientset); err != nil {
t.Errorf("unexpected error: %v", err)
}
eventBroadcaster := record.NewBroadcaster()
eventBroadcaster := record.NewBroadcaster(record.WithContext(ctx))
cloudNodeLifecycleController := &CloudNodeLifecycleController{
nodeLister: nodeInformer.Lister(),
kubeClient: clientset,
@ -902,11 +909,11 @@ func Test_NodesShutdown(t *testing.T) {
nodeMonitorPeriod: 1 * time.Second,
}
w := eventBroadcaster.StartLogging(klog.Infof)
w := eventBroadcaster.StartStructuredLogging(0)
defer w.Stop()
cloudNodeLifecycleController.MonitorNodes(context.TODO())
cloudNodeLifecycleController.MonitorNodes(ctx)
updatedNode, err := clientset.CoreV1().Nodes().Get(context.TODO(), testcase.existingNode.Name, metav1.GetOptions{})
updatedNode, err := clientset.CoreV1().Nodes().Get(ctx, testcase.existingNode.Name, metav1.GetOptions{})
if testcase.expectedDeleted != apierrors.IsNotFound(err) {
t.Fatalf("unexpected error happens when getting the node: %v", err)
}
@ -1047,11 +1054,13 @@ func Test_GetProviderID(t *testing.T) {
for _, testcase := range testcases {
t.Run(testcase.name, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
cloudNodeLifecycleController := &CloudNodeLifecycleController{
cloud: testcase.fakeCloud,
}
providerID, err := cloudNodeLifecycleController.getProviderID(context.TODO(), testcase.existingNode)
providerID, err := cloudNodeLifecycleController.getProviderID(ctx, testcase.existingNode)
if err != nil && testcase.expectedErr == nil {
t.Fatalf("unexpected error: %v", err)
@ -1070,8 +1079,8 @@ func Test_GetProviderID(t *testing.T) {
}
}
func syncNodeStore(nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error {
nodes, err := f.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{})
func syncNodeStore(ctx context.Context, nodeinformer coreinformers.NodeInformer, f *fake.Clientset) error {
nodes, err := f.CoreV1().Nodes().List(ctx, metav1.ListOptions{})
if err != nil {
return err
}

View File

@ -74,9 +74,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
klog.Fatal("RouteController: Must specify clusterCIDR.")
}
eventBroadcaster := record.NewBroadcaster()
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"})
rc := &RouteController{
routes: routes,
kubeClient: kubeClient,
@ -84,8 +81,6 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
clusterCIDRs: clusterCIDRs,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
broadcaster: eventBroadcaster,
recorder: recorder,
}
return rc
@ -94,6 +89,9 @@ func New(routes cloudprovider.Routes, kubeClient clientset.Interface, nodeInform
func (rc *RouteController) Run(ctx context.Context, syncPeriod time.Duration, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
defer utilruntime.HandleCrash()
rc.broadcaster = record.NewBroadcaster(record.WithContext(ctx))
rc.recorder = rc.broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "route_controller"})
// Start event processing pipeline.
if rc.broadcaster != nil {
rc.broadcaster.StartStructuredLogging(0)

View File

@ -31,6 +31,7 @@ import (
cloudprovider "k8s.io/cloud-provider"
fakecloud "k8s.io/cloud-provider/fake"
nodeutil "k8s.io/component-helpers/node/util"
"k8s.io/klog/v2/ktesting"
netutils "k8s.io/utils/net"
"github.com/stretchr/testify/assert"
@ -415,7 +416,8 @@ func TestReconcile(t *testing.T) {
}
for _, testCase := range testCases {
t.Run(testCase.description, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
cloud := &fakecloud.Cloud{RouteMap: make(map[string]*fakecloud.Route)}
for _, route := range testCase.initialRoutes {

View File

@ -109,17 +109,12 @@ func New(
clusterName string,
featureGate featuregate.FeatureGate,
) (*Controller, error) {
broadcaster := record.NewBroadcaster()
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
registerMetrics()
s := &Controller{
cloud: cloud,
kubeClient: kubeClient,
clusterName: clusterName,
cache: &serviceCache{serviceMap: make(map[string]*cachedService)},
eventBroadcaster: broadcaster,
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
@ -219,6 +214,9 @@ func (c *Controller) enqueueNode(obj interface{}) {
// It's an error to call Run() more than once for a given ServiceController
// object.
func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) {
c.eventBroadcaster = record.NewBroadcaster(record.WithContext(ctx))
c.eventRecorder = c.eventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
defer runtime.HandleCrash()
defer c.serviceQueue.ShutDown()
defer c.nodeQueue.ShutDown()

View File

@ -51,6 +51,7 @@ import (
fakecloud "k8s.io/cloud-provider/fake"
servicehelper "k8s.io/cloud-provider/service/helpers"
_ "k8s.io/controller-manager/pkg/features/register"
"k8s.io/klog/v2/ktesting"
utilpointer "k8s.io/utils/pointer"
)
@ -150,7 +151,8 @@ func defaultExternalService() *v1.Service {
// node/service informers and reacting to resource events. Callers can also
// specify `objects` which represent the initial state of objects, used to
// populate the client set / informer cache at start-up.
func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) {
func newController(ctx context.Context, objects ...runtime.Object) (*Controller, *fakecloud.Cloud, *fake.Clientset) {
stopCh := ctx.Done()
cloud := &fakecloud.Cloud{}
cloud.Region = region
@ -158,7 +160,7 @@ func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controll
informerFactory := informers.NewSharedInformerFactory(kubeClient, 0)
serviceInformer := informerFactory.Core().V1().Services()
nodeInformer := informerFactory.Core().V1().Nodes()
broadcaster := record.NewBroadcaster()
broadcaster := record.NewBroadcaster(record.WithContext(ctx))
broadcaster.StartStructuredLogging(0)
broadcaster.StartRecordingToSink(&v1core.EventSinkImpl{Interface: kubeClient.CoreV1().Events("")})
recorder := broadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: "service-controller"})
@ -275,9 +277,10 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller, cloud, client := newController(nil)
controller, cloud, client := newController(ctx)
cloud.Exists = tc.lbExists
key := fmt.Sprintf("%s/%s", tc.service.Namespace, tc.service.Name)
if _, err := client.CoreV1().Services(tc.service.Namespace).Create(ctx, tc.service, metav1.CreateOptions{}); err != nil {
@ -321,7 +324,7 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) {
}
for _, balancer := range cloud.Balancers {
if balancer.Name != controller.balancer.GetLoadBalancerName(context.Background(), "", tc.service) ||
if balancer.Name != controller.balancer.GetLoadBalancerName(ctx, "", tc.service) ||
balancer.Region != region ||
balancer.Ports[0].Port != tc.service.Spec.Ports[0].Port {
t.Errorf("Created load balancer has incorrect parameters: %v", balancer)
@ -449,9 +452,10 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) {
}}
for _, item := range table {
t.Run(item.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller, cloud, _ := newController(nil)
controller, cloud, _ := newController(ctx)
controller.nodeLister = newFakeNodeLister(nil, nodes...)
if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 {
t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry)
@ -607,11 +611,12 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) {
expectedUpdateCalls: []fakecloud.UpdateBalancerCall{},
}} {
t.Run(tc.desc, func(t *testing.T) {
controller, cloud, _ := newController(nil)
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller, cloud, _ := newController(ctx)
for _, svc := range services {
key, _ := cache.MetaNamespaceKeyFunc(svc)
controller.lastSyncedNodes[key] = tc.initialState
@ -638,6 +643,10 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) {
}
func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
node1 := makeNode(tweakName("node1"))
node2 := makeNode(tweakName("node2"))
node3 := makeNode(tweakName("node3"))
@ -655,7 +664,7 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
serviceNames.Insert(fmt.Sprintf("%s/%s", svc.GetObjectMeta().GetNamespace(), svc.GetObjectMeta().GetName()))
}
controller, cloud, _ := newController(nil)
controller, cloud, _ := newController(ctx)
for _, tc := range []struct {
desc string
nodes []*v1.Node
@ -715,7 +724,8 @@ func TestNodeChangesInExternalLoadBalancer(t *testing.T) {
expectedRetryServices: serviceNames,
}} {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller.nodeLister = newFakeNodeLister(tc.nodeListerErr, tc.nodes...)
servicesToRetry := controller.updateLoadBalancerHosts(ctx, services, tc.worker)
@ -771,7 +781,11 @@ func compareHostSets(t *testing.T, left, right []*v1.Node) bool {
}
func TestNodesNotEqual(t *testing.T) {
controller, cloud, _ := newController(nil)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller, cloud, _ := newController(ctx)
services := []*v1.Service{
newService("s0", v1.ServiceTypeLoadBalancer),
@ -818,7 +832,8 @@ func TestNodesNotEqual(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...)
@ -835,7 +850,11 @@ func TestNodesNotEqual(t *testing.T) {
}
func TestProcessServiceCreateOrUpdate(t *testing.T) {
controller, _, client := newController(nil)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller, _, client := newController(ctx)
//A pair of old and new loadbalancer IP address
oldLBIP := "192.168.1.1"
@ -908,7 +927,8 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) {
}}
for _, tc := range testCases {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
newSvc := tc.updateFn(tc.svc)
if _, err := client.CoreV1().Services(tc.svc.Namespace).Create(ctx, tc.svc, metav1.CreateOptions{}); err != nil {
@ -945,12 +965,13 @@ func TestProcessServiceCreateOrUpdateK8sError(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
svc := newService(svcName, v1.ServiceTypeLoadBalancer)
// Preset finalizer so k8s error only happens when patching status.
svc.Finalizers = []string{servicehelper.LoadBalancerCleanupFinalizer}
controller, _, client := newController(nil)
controller, _, client := newController(ctx)
client.PrependReactor("patch", "services", func(action core.Action) (bool, runtime.Object, error) {
return true, nil, tc.k8sErr
})
@ -987,14 +1008,14 @@ func TestSyncService(t *testing.T) {
testCases := []struct {
testName string
key string
updateFn func() //Function to manipulate the controller element to simulate error
expectedFn func(error) error //Expected function if returns nil then test passed, failed otherwise
updateFn func(context.Context) // Function to manipulate the controller element to simulate error
expectedFn func(error) error // Expected function if returns nil then test passed, failed otherwise
}{
{
testName: "if an invalid service name is synced",
key: "invalid/key/string",
updateFn: func() {
controller, _, _ = newController(nil)
updateFn: func(ctx context.Context) {
controller, _, _ = newController(ctx)
},
expectedFn: func(e error) error {
//TODO: should find a way to test for dependent package errors in such a way that it won't break
@ -1024,9 +1045,9 @@ func TestSyncService(t *testing.T) {
{
testName: "if valid service",
key: "external-balancer",
updateFn: func() {
updateFn: func(ctx context.Context) {
testSvc := defaultExternalService()
controller, _, _ = newController(nil)
controller, _, _ = newController(ctx)
controller.enqueueService(testSvc)
svc := controller.cache.getOrCreate("external-balancer")
svc.state = testSvc
@ -1043,10 +1064,11 @@ func TestSyncService(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.testName, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
tc.updateFn()
tc.updateFn(ctx)
obtainedErr := controller.syncService(ctx, tc.key)
//expected matches obtained ??.
@ -1128,11 +1150,12 @@ func TestProcessServiceDeletion(t *testing.T) {
}}
for _, tc := range testCases {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
//Create a new controller.
controller, cloud, _ = newController(nil)
controller, cloud, _ = newController(ctx)
tc.updateFn(controller)
obtainedErr := controller.processServiceDeletion(ctx, svcKey)
if err := tc.expectedFn(obtainedErr); err != nil {
@ -1213,8 +1236,10 @@ func TestNeedsCleanup(t *testing.T) {
// make sure that the slow node sync never removes the Node from LB set because it
// has stale data.
func TestSlowNodeSync(t *testing.T) {
stopCh, syncServiceDone, syncService := make(chan struct{}), make(chan string), make(chan string)
defer close(stopCh)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
syncServiceDone, syncService := make(chan string), make(chan string)
defer close(syncService)
node1 := makeNode(tweakName("node1"))
@ -1227,7 +1252,7 @@ func TestSlowNodeSync(t *testing.T) {
sKey2, _ := cache.MetaNamespaceKeyFunc(service2)
serviceKeys := sets.New(sKey1, sKey2)
controller, cloudProvider, kubeClient := newController(stopCh, node1, node2, service1, service2)
controller, cloudProvider, kubeClient := newController(ctx, node1, node2, service1, service2)
cloudProvider.UpdateCallCb = func(update fakecloud.UpdateBalancerCall) {
key, _ := cache.MetaNamespaceKeyFunc(update.Service)
impactedService := serviceKeys.Difference(sets.New(key)).UnsortedList()[0]
@ -1263,17 +1288,17 @@ func TestSlowNodeSync(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
controller.syncNodes(context.TODO(), 1)
controller.syncNodes(ctx, 1)
}()
key := <-syncService
if _, err := kubeClient.CoreV1().Nodes().Create(context.TODO(), node3, metav1.CreateOptions{}); err != nil {
if _, err := kubeClient.CoreV1().Nodes().Create(ctx, node3, metav1.CreateOptions{}); err != nil {
t.Fatalf("error creating node3, err: %v", err)
}
// Allow a bit of time for the informer cache to get populated with the new
// node
if err := wait.PollUntilContextCancel(wait.ContextForChannel(stopCh), 10*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
if err := wait.PollUntilContextCancel(ctx, 10*time.Millisecond, true, func(ctx context.Context) (done bool, err error) {
n3, _ := controller.nodeLister.Get("node3")
return n3 != nil, nil
}); err != nil {
@ -1281,7 +1306,7 @@ func TestSlowNodeSync(t *testing.T) {
}
// Sync the service
if err := controller.syncService(context.TODO(), key); err != nil {
if err := controller.syncService(ctx, key); err != nil {
t.Fatalf("unexpected service sync error, err: %v", err)
}
@ -1465,13 +1490,18 @@ func TestNeedsUpdate(t *testing.T) {
expectedNeedsUpdate: true,
}}
controller, _, _ := newController(nil)
for _, tc := range testCases {
oldSvc, newSvc := tc.updateFn()
obtainedResult := controller.needsUpdate(oldSvc, newSvc)
if obtainedResult != tc.expectedNeedsUpdate {
t.Errorf("%v needsUpdate() should have returned %v but returned %v", tc.testName, tc.expectedNeedsUpdate, obtainedResult)
}
t.Run(tc.testName, func(t *testing.T) {
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller, _, _ := newController(ctx)
oldSvc, newSvc := tc.updateFn()
obtainedResult := controller.needsUpdate(oldSvc, newSvc)
if obtainedResult != tc.expectedNeedsUpdate {
t.Errorf("%v needsUpdate() should have returned %v but returned %v", tc.testName, tc.expectedNeedsUpdate, obtainedResult)
}
})
}
}
@ -1606,7 +1636,8 @@ func TestAddFinalizer(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c := fake.NewSimpleClientset()
s := &Controller{
@ -1659,7 +1690,8 @@ func TestRemoveFinalizer(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c := fake.NewSimpleClientset()
s := &Controller{
@ -1756,7 +1788,8 @@ func TestPatchStatus(t *testing.T) {
for _, tc := range testCases {
t.Run(tc.desc, func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
c := fake.NewSimpleClientset()
s := &Controller{
@ -2277,7 +2310,10 @@ func TestServiceQueueDelay(t *testing.T) {
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
controller, cloud, client := newController(nil)
_, ctx := ktesting.NewTestContext(t)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
controller, cloud, client := newController(ctx)
queue := &spyWorkQueue{RateLimitingInterface: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "test-service-queue-delay")}
controller.serviceQueue = queue
cloud.Err = tc.lbCloudErr
@ -2290,7 +2326,6 @@ func TestServiceQueueDelay(t *testing.T) {
t.Fatalf("adding service %s to cache: %s", svc.Name, err)
}
ctx := context.Background()
_, err := client.CoreV1().Services(ns).Create(ctx, svc, metav1.CreateOptions{})
if err != nil {
t.Fatal(err)

View File

@ -17,6 +17,7 @@ limitations under the License.
package options
import (
"context"
"fmt"
"math/rand"
"net"
@ -221,7 +222,7 @@ func (o *CloudControllerManagerOptions) ApplyTo(c *config.Config, allControllers
return err
}
c.EventBroadcaster = record.NewBroadcaster()
c.EventBroadcaster = record.NewBroadcaster(record.WithContext(context.TODO())) // TODO: move broadcaster construction to a place where there is a proper context.
c.EventRecorder = c.EventBroadcaster.NewRecorder(scheme.Scheme, v1.EventSource{Component: userAgent})
rootClientBuilder := clientbuilder.SimpleControllerClientBuilder{