From 72c1c6559eb5c74000d8859141772b8025538985 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Thu, 21 Jul 2022 10:35:45 +0200 Subject: [PATCH] Cleanup service sync path It dawned on me that `needsFullSync` can never be false. `needsFullSync` was used to compare the set of nodes that were existing last time the node event handler was triggered, with the current set of node for this run. However, if `triggerNodeSync` gets called it's always because the set of nodes have changed due to a condition changing on one node, or a new node being added/removed. If `needsFullSync` can never be false then a lot of things in the service sync path was just spurious, for ex: `servicesToRetry`, `knownHosts`. Essentially: if we ever need to `triggerNodeSync` then the set of nodes have somehow changed and we always need to re-sync all services. Before this patch series there was a possibility for `needsFullSync` to be set to false. `shouldSyncNode` and the predicates used to list nodes were not aligned, specifically for Unschedulable nodes. This means that we could have been triggered by a change to the schedulable state but not actually computed any diffs between the old vs. new nodes. Meaning, whenever there was a change in schedulable state we would just try to re-sync all service updates that might have failed when we synced last time. But I believe this to be an overlooked coincidence, rather than something actually intended. --- .../controllers/service/controller.go | 71 ++----------------- .../controllers/service/controller_test.go | 53 +------------- 2 files changed, 7 insertions(+), 117 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 4c9a45510bc..f6fc1a03231 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -74,12 +74,10 @@ type serviceCache struct { // Controller keeps cloud provider service resources // (like load balancers) in sync with the registry. type Controller struct { - cloud cloudprovider.Interface - knownHosts []*v1.Node - servicesToUpdate sets.String - kubeClient clientset.Interface - clusterName string - balancer cloudprovider.LoadBalancer + cloud cloudprovider.Interface + kubeClient clientset.Interface + clusterName string + balancer cloudprovider.LoadBalancer // TODO(#85155): Stop relying on this and remove the cache completely. cache *serviceCache serviceLister corelisters.ServiceLister @@ -96,8 +94,6 @@ type Controller struct { nodeSyncLock sync.Mutex // nodeSyncCh triggers nodeSyncLoop to run nodeSyncCh chan interface{} - // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. - needFullSync bool // lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of // nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock lastSyncedNodes []*v1.Node @@ -125,7 +121,6 @@ func New( registerMetrics() s := &Controller{ cloud: cloud, - knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: clusterName, cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, @@ -200,15 +195,6 @@ func New( return s, nil } -// needFullSyncAndUnmark returns the value and needFullSync and marks the field to false. -func (c *Controller) needFullSyncAndUnmark() bool { - c.nodeSyncLock.Lock() - defer c.nodeSyncLock.Unlock() - ret := c.needFullSync - c.needFullSync = false - return ret -} - // obj could be an *v1.Service, or a DeletionFinalStateUnknown marker item. func (c *Controller) enqueueService(obj interface{}) { key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj) @@ -261,22 +247,6 @@ func (c *Controller) Run(ctx context.Context, workers int, controllerManagerMetr func (c *Controller) triggerNodeSync() { c.nodeSyncLock.Lock() defer c.nodeSyncLock.Unlock() - newHosts, err := listWithPredicates(c.nodeLister, allNodePredicates...) - if err != nil { - runtime.HandleError(fmt.Errorf("Failed to retrieve current set of nodes from node lister: %v", err)) - // if node list cannot be retrieve, trigger full node sync to be safe. - c.needFullSync = true - } else if !nodeSlicesEqualForLB(newHosts, c.knownHosts) { - // Here the last known state is recorded as knownHosts. For each - // LB update, the latest node list is retrieved. This is to prevent - // a stale set of nodes were used to be update loadbalancers when - // there are many loadbalancers in the clusters. nodeSyncInternal - // would be triggered until all loadbalancers are updated to the new state. - klog.V(2).Infof("Node changes detected, triggering a full node sync on all loadbalancer services") - c.needFullSync = true - c.knownHosts = newHosts - } - select { case c.nodeSyncCh <- struct{}{}: klog.V(4).Info("Triggering nodeSync") @@ -688,13 +658,6 @@ func nodeNames(nodes []*v1.Node) sets.String { return ret } -func nodeSlicesEqualForLB(x, y []*v1.Node) bool { - if len(x) != len(y) { - return false - } - return nodeNames(x).Equal(nodeNames(y)) -} - func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { // Evaluate the individual node exclusion predicate before evaluating the // compounded result of all predicates. We don't sync ETP=local services @@ -722,33 +685,12 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { nodeSyncLatency.Observe(latency) }() - if !c.needFullSyncAndUnmark() { - // The set of nodes in the cluster hasn't changed, but we can retry - // updating any services that we failed to update last time around. - // It is required to call `c.cache.get()` on each Service in case there was - // an update event that occurred between retries. - var servicesToUpdate []*v1.Service - for key := range c.servicesToUpdate { - cachedService, exist := c.cache.get(key) - if !exist { - klog.Errorf("Service %q should be in the cache but not", key) - continue - } - servicesToUpdate = append(servicesToUpdate, cachedService.state) - } - - c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) - return - } klog.V(2).Infof("Syncing backends for all LB services.") - - // Try updating all services, and save the failed ones to try again next - // round. servicesToUpdate := c.cache.allServices() numServices := len(servicesToUpdate) - c.servicesToUpdate = c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) + servicesToRetry := c.updateLoadBalancerHosts(ctx, servicesToUpdate, workers) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", - numServices-len(c.servicesToUpdate), numServices) + numServices-len(servicesToRetry), numServices) } // nodeSyncService syncs the nodes for one load balancer type service. The return value @@ -803,7 +745,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 key := fmt.Sprintf("%s/%s", services[piece].Namespace, services[piece].Name) servicesToRetry.Insert(key) } - workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index c6079a1b515..34c694e9ce0 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -26,6 +26,7 @@ import ( "testing" "time" + "github.com/stretchr/testify/assert" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -46,8 +47,6 @@ import ( servicehelper "k8s.io/cloud-provider/service/helpers" utilpointer "k8s.io/utils/pointer" - - "github.com/stretchr/testify/assert" ) const region = "us-central" @@ -101,7 +100,6 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { controller := &Controller{ cloud: cloud, - knownHosts: []*v1.Node{}, kubeClient: kubeClient, clusterName: "test-cluster", cache: &serviceCache{serviceMap: make(map[string]*cachedService)}, @@ -566,7 +564,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { defer cancel() controller, cloud, _ := newController() controller.nodeLister = newFakeNodeLister(nil, nodes...) - if servicesToRetry := controller.updateLoadBalancerHosts(ctx, item.services, item.workers); len(servicesToRetry) != 0 { t.Errorf("for case %q, unexpected servicesToRetry: %v", item.desc, servicesToRetry) } @@ -1516,28 +1513,6 @@ func TestServiceCache(t *testing.T) { } } -// Test a utility functions as it's not easy to unit test nodeSyncInternal directly -func TestNodeSlicesEqualForLB(t *testing.T) { - numNodes := 10 - nArray := make([]*v1.Node, numNodes) - mArray := make([]*v1.Node, numNodes) - for i := 0; i < numNodes; i++ { - nArray[i] = &v1.Node{} - nArray[i].Name = fmt.Sprintf("node%d", i) - } - for i := 0; i < numNodes; i++ { - mArray[i] = &v1.Node{} - mArray[i].Name = fmt.Sprintf("node%d", i+1) - } - - if !nodeSlicesEqualForLB(nArray, nArray) { - t.Errorf("nodeSlicesEqualForLB() Expected=true Obtained=false") - } - if nodeSlicesEqualForLB(nArray, mArray) { - t.Errorf("nodeSlicesEqualForLB() Expected=false Obtained=true") - } -} - // TODO(@MrHohn): Verify the end state when below issue is resolved: // https://github.com/kubernetes/client-go/issues/607 func TestAddFinalizer(t *testing.T) { @@ -3227,32 +3202,6 @@ func TestTriggerNodeSync(t *testing.T) { tryReadFromChannel(t, controller.nodeSyncCh, false) } -func TestMarkAndUnmarkFullSync(t *testing.T) { - controller, _, _ := newController() - if controller.needFullSync != false { - t.Errorf("expect controller.needFullSync to be false, but got true") - } - - ret := controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } - - ret = controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } - controller.needFullSync = true - ret = controller.needFullSyncAndUnmark() - if ret != true { - t.Errorf("expect ret == true, but got false") - } - ret = controller.needFullSyncAndUnmark() - if ret != false { - t.Errorf("expect ret == false, but got true") - } -} - func tryReadFromChannel(t *testing.T, ch chan interface{}, expectValue bool) { select { case _, ok := <-ch: