From 72c1c6559eb5c74000d8859141772b8025538985 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Thu, 21 Jul 2022 10:35:45 +0200 Subject: [PATCH 1/2] Cleanup service sync path It dawned on me that `needsFullSync` can never be false. `needsFullSync` was used to compare the set of nodes that were existing last time the node event handler was triggered, with the current set of node for this run. However, if `triggerNodeSync` gets called it's always because the set of nodes have changed due to a condition changing on one node, or a new node being added/removed. If `needsFullSync` can never be false then a lot of things in the service sync path was just spurious, for ex: `servicesToRetry`, `knownHosts`. Essentially: if we ever need to `triggerNodeSync` then the set of nodes have somehow changed and we always need to re-sync all services. Before this patch series there was a possibility for `needsFullSync` to be set to false. `shouldSyncNode` and the predicates used to list nodes were not aligned, specifically for Unschedulable nodes. This means that we could have been triggered by a change to the schedulable state but not actually computed any diffs between the old vs. new nodes. Meaning, whenever there was a change in schedulable state we would just try to re-sync all service updates that might have failed when we synced last time. But I believe this to be an overlooked coincidence, rather than something actually intended. --- .../controllers/service/controller.go | 71 ++----------------- .../controllers/service/controller_test.go | 53 +------------- 2 files changed, 7 insertions(+), 117 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 4c9a45510bc..f6fc1a03231 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -74,12 +74,10 @@ type serviceCache struct { // Controller keeps cloud provider service resources // (like load balancers) in sync with the registry. type Controller struct { - cloud cloudprovider.Interface - knownHosts []*v1.Node - servicesToUpdate sets.String - kubeClient clientset.Interface - clusterName string - balancer cloudprovider.LoadBalancer + cloud cloudprovider.Interface + kubeClient clientset.Interface + clusterName string + balancer cloudprovider.LoadBalancer // TODO(#85155): Stop relying on this and remove the cache completely. cache *serviceCache serviceLister corelisters.ServiceLister @@ -96,8 +94,6 @@ type Controller struct { nodeSyncLock sync.Mutex // nodeSyncCh triggers nodeSyncLoop to run nodeSyncCh chan interface{} - // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. - needFullSync bool // lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of // nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock lastSyncedNodes []*v1.Node @@ -125,7 +121,6 @@ func New( registerMetrics() s := &Controller{ cloud: cloud, - knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: clusterName, cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, @@ -200,15 +195,6 @@ func New( return s, nil } -// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false. -func (c *Controller) needFullSyncAndUnmark() bool { - c.nodeSyncLock.Lock() - defer c.nodeSyncLock.Unlock() - ret := c.needFullSync - c.needFullSync = false - return ret -} - // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. func (c *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) @@ -261,22 +247,6 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr func (c *Controller) triggerNodeSync() { c.nodeSyncLock.Lock() defer c.nodeSyncLock.Unlock() - newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...) - if err != nil { - runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) - // if node list cannot be retrieve, trigger full node sync to be safe. - c.needFullSync = true - } else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) { - // Here the last known state is recorded as knownHosts. For each - // LB update, the latest node list is retrieved. This is to prevent - // a stale set of nodes were used to be update loadbalancers when - // there are many loadbalancers in the clusters. nodeSyncInternal - // would be triggered until all loadbalancers are updated to the new state. - klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") - c.needFullSync = true - c.knownHosts = newHosts - } - select { case c.nodeSyncCh <- struct{}{}: klog.V(4).Info("Triggering nodeSync") @@ -688,13 +658,6 @@ func nodeNames(nodes []*v1.Node) sets.String { return ret } -func nodeSlicesEqualForLB(x, y []*v1.Node) bool { - if len(x) != len(y) { - return false - } - return nodeNames(x).Equal(nodeNames(y)) -} - func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { // Evaluate the individual node exclusion predicate before evaluating the // compounded result of all predicates. We don't sync ETP=local services @@ -722,33 +685,12 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { nodeSyncLatency.Observe(latency) }() - if !c.needFullSyncAndUnmark() { - // The set of nodes in the cluster hasn't changed, but we can retry - // updating any services that we failed to update last time around. - // It is required to call `c.cache.get()` on each Service in case there was - // an update event that occurred between retries. - var servicesToUpdate []*v1.Service - for key := range c.servicesToUpdate { - cachedService, exist := c.cache.get(key) - if !exist { - klog.Errorf("Service %q should be in the cache but not", key) - continue - } - servicesToUpdate = append(servicesToUpdate, cachedService.state) - } - - c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) - return - } klog.V(2).Infof("Syncing backends for all LB services.") - - // Try updating all services, and save the failed ones to try again next - // round. servicesToUpdate := c.cache.allServices() numServices := len(servicesToUpdate) - c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) + servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", - numServices-len(c.servicesToUpdate), numServices) + numServices-len(servicesToRetry), numServices) } // nodeSyncService syncs the nodes for one load balancer type service. The return value @@ -803,7 +745,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name) servicesToRetry.Insert(key) } - workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index c6079a1b515..34c694e9ce0 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,8 +47,6 @@ import ( servicehelper "k8s.io/cloud-provider/service/helpers" utilpointer "k8s.io/utils/pointer" - - "github.com/stretchr/testify/assert" ) const region = "us-central" @@ -101,7 +100,6 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { controller := &Controller{ cloud: cloud, - knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: "test-cluster", cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, @@ -566,7 +564,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { defer cancel() controller, cloud, _ := newController() controller.nodeLister = newFakeNodeLister(nil, nodes...) - if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) } @@ -1516,28 +1513,6 @@ func TestServiceCache(t *testing.T) { } } -// Test a utility functions as it's not easy to unit test nodeSyncInternal directly -func TestNodeSlicesEqualForLB(t *testing.T) { - numNodes := 10 - nArray := make([]*v1.Node, numNodes) - mArray := make([]*v1.Node, numNodes) - for i := 0; i < numNodes; i++ { - nArray[i] = &v1.Node{} - nArray[i].Name = fmt.Sprintf("node%d", i) - } - for i := 0; i < numNodes; i++ { - mArray[i] = &v1.Node{} - mArray[i].Name = fmt.Sprintf("node%d", i+1) - } - - if !nodeSlicesEqualForLB(nArray, nArray) { - t.Errorf("nodeSlicesEqualForLB() Expected=true Obtained=false") - } - if nodeSlicesEqualForLB(nArray, mArray) { - t.Errorf("nodeSlicesEqualForLB() Expected=false Obtained=true") - } -} - // TODO(@MrHohn): Verify the end state when below issue is resolved: // https://github.com/kubernetes/client-go/issues/607 func TestAddFinalizer(t *testing.T) { @@ -3227,32 +3202,6 @@ func TestTriggerNodeSync(t *testing.T) { tryReadFromChannel(t, controller.nodeSyncCh, false) } -func TestMarkAndUnmarkFullSync(t *testing.T) { - controller, _, _ := newController() - if controller.needFullSync != false { - t.Errorf("expect controller.needFullSync to be false, but got true") - } - - ret := controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } - - ret = controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } - controller.needFullSync = true - ret = controller.needFullSyncAndUnmark() - if ret != true { - t.Errorf("expect ret == true, but got false") - } - ret = controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } -} - func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) { select { case _, ok := <-ch: From c44ab14e202d5b78a8bd4d9e592057063fc1b9b8 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Thu, 21 Jul 2022 11:14:26 +0200 Subject: [PATCH 2/2] Service controller: use a workqueue for node updates Services which fail to be successfully synced as a cause by a triggered node event are actually never retried. The commit before this one gave an example of when such services used to be retried before, but that was not really efficient nor fully correct. Moving to a workqueue for node events is a more modern approach to syncing nodes, and placing all service keys that have failed on the service workqueue, in case they do, fixes the re-sync problem Also, now that we are using a node workqueue and use one go-routine to service items from that queue, we don't need the `nodeSyncLock` anymore. So further clean that up from the controller. --- .../controllers/service/controller.go | 119 +++++++++--------- .../controllers/service/controller_test.go | 40 +----- 2 files changed, 64 insertions(+), 95 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index f6fc1a03231..273508e75a1 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -86,16 +86,13 @@ type Controller struct { eventRecorder record.EventRecorder nodeLister corelisters.NodeLister nodeListerSynced cache.InformerSynced - // services that need to be synced - queue workqueue.RateLimitingInterface - - // nodeSyncLock ensures there is only one instance of triggerNodeSync getting executed at one time - // and protects internal states (needFullSync) of nodeSync - nodeSyncLock sync.Mutex - // nodeSyncCh triggers nodeSyncLoop to run - nodeSyncCh chan interface{} - // lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of - // nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock + // services and nodes that need to be synced + serviceQueue workqueue.RateLimitingInterface + nodeQueue workqueue.RateLimitingInterface + // lastSyncedNodes is used when reconciling node state and keeps track of + // the last synced set of nodes. This field is concurrently safe because the + // nodeQueue is serviced by only one go-routine, so node events are not + // processed concurrently. lastSyncedNodes []*v1.Node } @@ -128,10 +125,9 @@ func New( eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. - nodeSyncCh: make(chan interface{}, 1), - lastSyncedNodes: []*v1.Node{}, + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), + lastSyncedNodes: []*v1.Node{}, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -162,7 +158,7 @@ func New( nodeInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ AddFunc: func(cur interface{}) { - s.triggerNodeSync() + s.enqueueNode(cur) }, UpdateFunc: func(old, cur interface{}) { oldNode, ok := old.(*v1.Node) @@ -179,13 +175,13 @@ func New( return } - s.triggerNodeSync() + s.enqueueNode(curNode) }, DeleteFunc: func(old interface{}) { - s.triggerNodeSync() + s.enqueueNode(old) }, }, - time.Duration(0), + nodeSyncPeriod, ) if err := s.init(); err != nil { @@ -202,7 +198,17 @@ func (c *Controller) enqueueService(obj interface{}) { runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) return } - c.queue.Add(key) + c.serviceQueue.Add(key) +} + +// obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. +func (c *Controller) enqueueNode(obj interface{}) { + key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) + if err != nil { + runtime.HandleError(fmt.Errorf("couldn't get key for object %#v: %v", obj, err)) + return + } + c.nodeQueue.Add(key) } // Run starts a background goroutine that watches for changes to services that @@ -217,7 +223,8 @@ func (c *Controller) enqueueService(obj interface{}) { // object. func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetrics *controllersmetrics.ControllerManagerMetrics) { defer runtime.HandleCrash() - defer c.queue.ShutDown() + defer c.serviceQueue.ShutDown() + defer c.nodeQueue.ShutDown() // Start event processing pipeline. c.eventBroadcaster.StartStructuredLogging(0) @@ -234,65 +241,60 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr } for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, c.worker, time.Second) + go wait.UntilWithContext(ctx, c.serviceWorker, time.Second) } - go c.nodeSyncLoop(ctx, workers) - go wait.Until(c.triggerNodeSync, nodeSyncPeriod, ctx.Done()) + // Initialize one go-routine servicing node events. This ensure we only + // process one node at any given moment in time + go wait.UntilWithContext(ctx, func(ctx context.Context) { c.nodeWorker(ctx, workers) }, time.Second) <-ctx.Done() } -// triggerNodeSync triggers a nodeSync asynchronously -func (c *Controller) triggerNodeSync() { - c.nodeSyncLock.Lock() - defer c.nodeSyncLock.Unlock() - select { - case c.nodeSyncCh <- struct{}{}: - klog.V(4).Info("Triggering nodeSync") - return - default: - klog.V(4).Info("A pending nodeSync is already in queue") - return +// worker runs a worker thread that just dequeues items, processes them, and marks them done. +// It enforces that the syncHandler is never invoked concurrently with the same key. +func (c *Controller) serviceWorker(ctx context.Context) { + for c.processNextServiceItem(ctx) { } } // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. -func (c *Controller) worker(ctx context.Context) { - for c.processNextWorkItem(ctx) { +func (c *Controller) nodeWorker(ctx context.Context, workers int) { + for c.processNextNodeItem(ctx, workers) { } } -// nodeSyncLoop takes nodeSync signal and triggers nodeSync -func (c *Controller) nodeSyncLoop(ctx context.Context, workers int) { - klog.V(4).Info("nodeSyncLoop Started") - for { - select { - case <-c.nodeSyncCh: - klog.V(4).Info("nodeSync has been triggered") - c.nodeSyncInternal(ctx, workers) - case <-ctx.Done(): - return - } - } -} - -func (c *Controller) processNextWorkItem(ctx context.Context) bool { - key, quit := c.queue.Get() +func (c *Controller) processNextNodeItem(ctx context.Context, workers int) bool { + key, quit := c.nodeQueue.Get() if quit { return false } - defer c.queue.Done(key) + defer c.nodeQueue.Done(key) + + for serviceToRetry := range c.syncNodes(ctx, workers) { + c.serviceQueue.Add(serviceToRetry) + } + + c.nodeQueue.Forget(key) + return true +} + +func (c *Controller) processNextServiceItem(ctx context.Context) bool { + key, quit := c.serviceQueue.Get() + if quit { + return false + } + defer c.serviceQueue.Done(key) err := c.syncService(ctx, key.(string)) if err == nil { - c.queue.Forget(key) + c.serviceQueue.Forget(key) return true } runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) - c.queue.AddRateLimited(key) + c.serviceQueue.AddRateLimited(key) return true } @@ -675,13 +677,13 @@ func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...) } -// nodeSyncInternal handles updating the hosts pointed to by all load +// syncNodes handles updating the hosts pointed to by all load // balancers whenever the set of nodes in the cluster changes. -func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { +func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { startTime := time.Now() defer func() { latency := time.Since(startTime).Seconds() - klog.V(4).Infof("It took %v seconds to finish nodeSyncInternal", latency) + klog.V(4).Infof("It took %v seconds to finish syncNodes", latency) nodeSyncLatency.Observe(latency) }() @@ -691,6 +693,7 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", numServices-len(servicesToRetry), numServices) + return servicesToRetry } // nodeSyncService syncs the nodes for one load balancer type service. The return value diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 34c694e9ce0..df0daf1fc0e 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -107,8 +107,8 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { eventRecorder: recorder, nodeLister: newFakeNodeLister(nil), nodeListerSynced: nodeInformer.Informer().HasSynced, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), - nodeSyncCh: make(chan interface{}, 1), + serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), + nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), lastSyncedNodes: []*v1.Node{}, } @@ -905,7 +905,7 @@ func TestProcessServiceCreateOrUpdate(t *testing.T) { cachedServiceTest.state = svc controller.cache.set(keyExpected, cachedServiceTest) - keyGot, quit := controller.queue.Get() + keyGot, quit := controller.serviceQueue.Get() if quit { t.Fatalf("get no queue element") } @@ -3184,40 +3184,6 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { } } -func TestTriggerNodeSync(t *testing.T) { - controller, _, _ := newController() - - tryReadFromChannel(t, controller.nodeSyncCh, false) - controller.triggerNodeSync() - tryReadFromChannel(t, controller.nodeSyncCh, true) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - controller.triggerNodeSync() - controller.triggerNodeSync() - controller.triggerNodeSync() - tryReadFromChannel(t, controller.nodeSyncCh, true) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) - tryReadFromChannel(t, controller.nodeSyncCh, false) -} - -func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) { - select { - case _, ok := <-ch: - if !ok { - t.Errorf("The channel is closed") - } - if !expectValue { - t.Errorf("Does not expect value from the channel, but got a value") - } - default: - if expectValue { - t.Errorf("Expect value from the channel, but got none") - } - } -} - type fakeNodeLister struct { cache []*v1.Node err error