diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 4c9a45510bc..273508e75a1 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -74,12 +74,10 @@ type serviceCache struct { // Controller keeps cloud provider service resources // (like load balancers) in sync with the registry. type Controller struct { - cloud cloudprovider.Interface - knownHosts []*v1.Node - servicesToUpdate sets.String - kubeClient clientset.Interface - clusterName string - balancer cloudprovider.LoadBalancer + cloud cloudprovider.Interface + kubeClient clientset.Interface + clusterName string + balancer cloudprovider.LoadBalancer // TODO(#85155): Stop relying on this and remove the cache completely. cache *serviceCache serviceLister corelisters.ServiceLister @@ -88,18 +86,13 @@ type Controller struct { eventRecorder record.EventRecorder nodeLister corelisters.NodeLister nodeListerSynced cache.InformerSynced - // services that need to be synced - queue workqueue.RateLimitingInterface - - // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time - // and protects internal states (needFullSync) of nodeSync - nodeSyncLock sync.Mutex - // nodeSyncCh triggers nodeSyncLoop to run - nodeSyncCh chan interface{} - // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. - needFullSync bool - // lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of - // nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock + // services and nodes that need to be synced + serviceQueue workqueue.RateLimitingInterface + nodeQueue workqueue.RateLimitingInterface + // lastSyncedNodes is used when reconciling node state and keeps track of + // the last synced set of nodes. This field is concurrently safe because the + // nodeQueue is serviced by only one go-routine, so node events are not + // processed concurrently. lastSyncedNodes []*v1.Node } @@ -125,7 +118,6 @@ func New( registerMetrics() s := &Controller{ cloud: cloud, - knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: clusterName, cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, @@ -133,10 +125,9 @@ func New( eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. - nodeSyncCh: make(chan interface{}, 1), - lastSyncedNodes: []*v1.Node{}, + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), + lastSyncedNodes: []*v1.Node{}, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -167,7 +158,7 @@ func New( nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { - s.triggerNodeSync() + s.enqueueNode(cur) }, UpdateFunc: func(old, cur interface{}) { oldNode, ok := old.(*v1.Node) @@ -184,13 +175,13 @@ func New( return } - s.triggerNodeSync() + s.enqueueNode(curNode) }, DeleteFunc: func(old interface{}) { - s.triggerNodeSync() + s.enqueueNode(old) }, }, - time.Duration(0), + nodeSyncPeriod, ) if err := s.init(); err != nil { @@ -200,15 +191,6 @@ func New( return s, nil } -// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false. -func (c *Controller) needFullSyncAndUnmark() bool { - c.nodeSyncLock.Lock() - defer c.nodeSyncLock.Unlock() - ret := c.needFullSync - c.needFullSync = false - return ret -} - // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. func (c *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) @@ -216,7 +198,17 @@ func (c *Controller) enqueueService(obj interface{}) { runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) return } - c.queue.Add(key) + c.serviceQueue.Add(key) +} + +// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. +func (c *Controller) enqueueNode(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) + return + } + c.nodeQueue.Add(key) } // Run starts a background goroutine that watches for changes to services that @@ -231,7 +223,8 @@ func (c *Controller) enqueueService(obj interface{}) { // object. func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { defer runtime.HandleCrash() - defer c.queue.ShutDown() + defer c.serviceQueue.ShutDown() + defer c.nodeQueue.ShutDown() // Start event processing pipeline. c.eventBroadcaster.StartStructuredLogging(0) @@ -248,81 +241,60 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr } for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.worker, time.Second) + go wait.UntilWithContext(ctx, c.serviceWorker, time.Second) } - go c.nodeSyncLoop(ctx, workers) - go wait.Until(c.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + // Initialize one go-routine servicing node events. This ensure we only + // process one node at any given moment in time + go wait.UntilWithContext(ctx, func(ctx context.Context) { c.nodeWorker(ctx, workers) }, time.Second) <-ctx.Done() } -// triggerNodeSync triggers a nodeSync asynchronously -func (c *Controller) triggerNodeSync() { - c.nodeSyncLock.Lock() - defer c.nodeSyncLock.Unlock() - newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...) - if err != nil { - runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) - // if node list cannot be retrieve, trigger full node sync to be safe. - c.needFullSync = true - } else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) { - // Here the last known state is recorded as knownHosts. For each - // LB update, the latest node list is retrieved. This is to prevent - // a stale set of nodes were used to be update loadbalancers when - // there are many loadbalancers in the clusters. nodeSyncInternal - // would be triggered until all loadbalancers are updated to the new state. - klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") - c.needFullSync = true - c.knownHosts = newHosts - } - - select { - case c.nodeSyncCh <- struct{}{}: - klog.V(4).Info("Triggering nodeSync") - return - default: - klog.V(4).Info("A pending nodeSync is already in queue") - return +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (c *Controller) serviceWorker(ctx context.Context) { + for c.processNextServiceItem(ctx) { } } // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (c *Controller) worker(ctx context.Context) { - for c.processNextWorkItem(ctx) { +func (c *Controller) nodeWorker(ctx context.Context, workers int) { + for c.processNextNodeItem(ctx, workers) { } } -// nodeSyncLoop takes nodeSync signal and triggers nodeSync -func (c *Controller) nodeSyncLoop(ctx context.Context, workers int) { - klog.V(4).Info("nodeSyncLoop Started") - for { - select { - case <-c.nodeSyncCh: - klog.V(4).Info("nodeSync has been triggered") - c.nodeSyncInternal(ctx, workers) - case <-ctx.Done(): - return - } - } -} - -func (c *Controller) processNextWorkItem(ctx context.Context) bool { - key, quit := c.queue.Get() +func (c *Controller) processNextNodeItem(ctx context.Context, workers int) bool { + key, quit := c.nodeQueue.Get() if quit { return false } - defer c.queue.Done(key) + defer c.nodeQueue.Done(key) + + for serviceToRetry := range c.syncNodes(ctx, workers) { + c.serviceQueue.Add(serviceToRetry) + } + + c.nodeQueue.Forget(key) + return true +} + +func (c *Controller) processNextServiceItem(ctx context.Context) bool { + key, quit := c.serviceQueue.Get() + if quit { + return false + } + defer c.serviceQueue.Done(key) err := c.syncService(ctx, key.(string)) if err == nil { - c.queue.Forget(key) + c.serviceQueue.Forget(key) return true } runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) - c.queue.AddRateLimited(key) + c.serviceQueue.AddRateLimited(key) return true } @@ -688,13 +660,6 @@ func nodeNames(nodes []*v1.Node) sets.String { return ret } -func nodeSlicesEqualForLB(x, y []*v1.Node) bool { - if len(x) != len(y) { - return false - } - return nodeNames(x).Equal(nodeNames(y)) -} - func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { // Evaluate the individual node exclusion predicate before evaluating the // compounded result of all predicates. We don't sync ETP=local services @@ -712,43 +677,23 @@ func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...) } -// nodeSyncInternal handles updating the hosts pointed to by all load +// syncNodes handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. -func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { +func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { startTime := time.Now() defer func() { latency := time.Since(startTime).Seconds() - klog.V(4).Infof("It took %v seconds to finish nodeSyncInternal", latency) + klog.V(4).Infof("It took %v seconds to finish syncNodes", latency) nodeSyncLatency.Observe(latency) }() - if !c.needFullSyncAndUnmark() { - // The set of nodes in the cluster hasn't changed, but we can retry - // updating any services that we failed to update last time around. - // It is required to call `c.cache.get()` on each Service in case there was - // an update event that occurred between retries. - var servicesToUpdate []*v1.Service - for key := range c.servicesToUpdate { - cachedService, exist := c.cache.get(key) - if !exist { - klog.Errorf("Service %q should be in the cache but not", key) - continue - } - servicesToUpdate = append(servicesToUpdate, cachedService.state) - } - - c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) - return - } klog.V(2).Infof("Syncing backends for all LB services.") - - // Try updating all services, and save the failed ones to try again next - // round. servicesToUpdate := c.cache.allServices() numServices := len(servicesToUpdate) - c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) + servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", - numServices-len(c.servicesToUpdate), numServices) + numServices-len(servicesToRetry), numServices) + return servicesToRetry } // nodeSyncService syncs the nodes for one load balancer type service. The return value @@ -803,7 +748,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name) servicesToRetry.Insert(key) } - workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index c6079a1b515..df0daf1fc0e 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,8 +47,6 @@ import ( servicehelper "k8s.io/cloud-provider/service/helpers" utilpointer "k8s.io/utils/pointer" - - "github.com/stretchr/testify/assert" ) const region = "us-central" @@ -101,7 +100,6 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { controller := &Controller{ cloud: cloud, - knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: "test-cluster", cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, @@ -109,8 +107,8 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { eventRecorder: recorder, nodeLister: newFakeNodeLister(nil), nodeListerSynced: nodeInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeSyncCh: make(chan interface{}, 1), + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), lastSyncedNodes: []*v1.Node{}, } @@ -566,7 +564,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { defer cancel() controller, cloud, _ := newController() controller.nodeLister = newFakeNodeLister(nil, nodes...) - if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) } @@ -908,7 +905,7 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) { cachedServiceTest.state = svc controller.cache.set(keyExpected, cachedServiceTest) - keyGot, quit := controller.queue.Get() + keyGot, quit := controller.serviceQueue.Get() if quit { t.Fatalf("get no queue element") } @@ -1516,28 +1513,6 @@ func TestServiceCache(t *testing.T) { } } -// Test a utility functions as it's not easy to unit test nodeSyncInternal directly -func TestNodeSlicesEqualForLB(t *testing.T) { - numNodes := 10 - nArray := make([]*v1.Node, numNodes) - mArray := make([]*v1.Node, numNodes) - for i := 0; i < numNodes; i++ { - nArray[i] = &v1.Node{} - nArray[i].Name = fmt.Sprintf("node%d", i) - } - for i := 0; i < numNodes; i++ { - mArray[i] = &v1.Node{} - mArray[i].Name = fmt.Sprintf("node%d", i+1) - } - - if !nodeSlicesEqualForLB(nArray, nArray) { - t.Errorf("nodeSlicesEqualForLB() Expected=true Obtained=false") - } - if nodeSlicesEqualForLB(nArray, mArray) { - t.Errorf("nodeSlicesEqualForLB() Expected=false Obtained=true") - } -} - // TODO(@MrHohn): Verify the end state when below issue is resolved: // https://github.com/kubernetes/client-go/issues/607 func TestAddFinalizer(t *testing.T) { @@ -3209,66 +3184,6 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { } } -func TestTriggerNodeSync(t *testing.T) { - controller, _, _ := newController() - - tryReadFromChannel(t, controller.nodeSyncCh, false) - controller.triggerNodeSync() - tryReadFromChannel(t, controller.nodeSyncCh, true) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - controller.triggerNodeSync() - controller.triggerNodeSync() - controller.triggerNodeSync() - tryReadFromChannel(t, controller.nodeSyncCh, true) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) -} - -func TestMarkAndUnmarkFullSync(t *testing.T) { - controller, _, _ := newController() - if controller.needFullSync != false { - t.Errorf("expect controller.needFullSync to be false, but got true") - } - - ret := controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } - - ret = controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } - controller.needFullSync = true - ret = controller.needFullSyncAndUnmark() - if ret != true { - t.Errorf("expect ret == true, but got false") - } - ret = controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } -} - -func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) { - select { - case _, ok := <-ch: - if !ok { - t.Errorf("The channel is closed") - } - if !expectValue { - t.Errorf("Does not expect value from the channel, but got a value") - } - default: - if expectValue { - t.Errorf("Expect value from the channel, but got none") - } - } -} - type fakeNodeLister struct { cache []*v1.Node err error