From bb3cba0d16409fa349f253b90e97fa220a4ec5e8 Mon Sep 17 00:00:00 2001 From: Alexander Constantinescu Date: Thu, 21 Jul 2022 12:11:54 +0200 Subject: [PATCH] Avoid re-syncing LBs for ETP=local svc This removes the CCM's LB sync for nodes which are experiencing a transitioning readiness state only. --- .../controllers/service/controller.go | 83 +++++-- .../controllers/service/controller_test.go | 215 +++++++++++++++++- 2 files changed, 277 insertions(+), 21 deletions(-) diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index f1944d1fd41..79a886500d5 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -97,6 +97,9 @@ type Controller struct { nodeSyncCh chan interface{} // needFullSync indicates if the nodeSyncInternal will do a full node sync on all LB services. needFullSync bool + // lastSyncedNodes is used when reconciling node state and keeps track of the last synced set of + // nodes. Access to this attribute by multiple go-routines is protected by nodeSyncLock + lastSyncedNodes []*v1.Node } // New returns a new service controller to keep cloud provider service resources @@ -131,7 +134,8 @@ func New( nodeListerSynced: nodeInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), // nodeSyncCh has a size 1 buffer. Only one pending sync signal would be cached. - nodeSyncCh: make(chan interface{}, 1), + nodeSyncCh: make(chan interface{}, 1), + lastSyncedNodes: []*v1.Node{}, } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -450,7 +454,7 @@ func (c *Controller) syncLoadBalancerIfNeeded(ctx context.Context, service *v1.S } func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service) (*v1.LoadBalancerStatus, error) { - nodes, err := listWithPredicates(c.nodeLister, allNodePredicates...) + nodes, err := listWithPredicates(c.nodeLister, getNodePredicatesForService(service)...) if err != nil { return nil, err } @@ -664,6 +668,15 @@ func portEqualForLB(x, y *v1.ServicePort) bool { return true } +func serviceKeys(services []*v1.Service) sets.String { + ret := sets.NewString() + for _, service := range services { + key, _ := cache.MetaNamespaceKeyFunc(service) + ret.Insert(key) + } + return ret +} + func nodeNames(nodes []*v1.Node) sets.String { ret := sets.NewString() for _, node := range nodes { @@ -680,6 +693,19 @@ func nodeSlicesEqualForLB(x, y []*v1.Node) bool { } func shouldSyncUpdatedNode(oldNode, newNode *v1.Node) bool { + // Evaluate the individual node exclusion predicate before evaluating the + // compounded result of all predicates. We don't sync ETP=local services + // for changes on the readiness condition, hence if a node remains NotReady + // and a user adds the exclusion label we will need to sync as to make sure + // this change is reflected correctly on ETP=local services. The sync + // function compares lastSyncedNodes with the new (existing) set of nodes + // for each service, so services which are synced with the same set of nodes + // should be skipped internally in the sync function. This is needed as to + // trigger a global sync for all services and make sure no service gets + // skipped due to a changing node predicate. + if respectsPredicates(oldNode, nodeIncludedPredicate) != respectsPredicates(newNode, nodeIncludedPredicate) { + return true + } return respectsPredicates(oldNode, allNodePredicates...) != respectsPredicates(newNode, allNodePredicates...) } @@ -722,24 +748,29 @@ func (c *Controller) nodeSyncInternal(ctx context.Context, workers int) { numServices-len(c.servicesToUpdate), numServices) } -// nodeSyncService syncs the nodes for one load balancer type service -func (c *Controller) nodeSyncService(svc *v1.Service) bool { +// nodeSyncService syncs the nodes for one load balancer type service. The return value +// indicates if we should retry. Hence, this functions returns false if we've updated +// load balancers and finished doing it successfully, or didn't try to at all because +// there's no need. This function returns true if we tried to update load balancers and +// failed, indicating to the caller that we should try again. +func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool { + retSuccess := false + retNeedRetry := true if svc == nil || !wantsLoadBalancer(svc) { - return false + return retSuccess + } + newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...) + oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...) + if nodeNames(newNodes).Equal(nodeNames(oldNodes)) { + return retSuccess } klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) - hosts, err := listWithPredicates(c.nodeLister, allNodePredicates...) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) - return true - } - - if err := c.lockedUpdateLoadBalancerHosts(svc, hosts); err != nil { + if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) - return true + return retNeedRetry } klog.V(4).Infof("nodeSyncService finished successfully for service %s/%s", svc.Namespace, svc.Name) - return false + return retSuccess } // updateLoadBalancerHosts updates all existing load balancers so that @@ -748,11 +779,20 @@ func (c *Controller) nodeSyncService(svc *v1.Service) bool { func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) + // Include all nodes and let nodeSyncService filter and figure out if + // the update is relevant for the service in question. + nodes, err := listWithPredicates(c.nodeLister) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) + return serviceKeys(services) + } + // lock for servicesToRetry servicesToRetry = sets.NewString() lock := sync.Mutex{} + doWork := func(piece int) { - if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry { + if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry { return } lock.Lock() @@ -762,6 +802,7 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 } workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) + c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") return servicesToRetry } @@ -944,8 +985,20 @@ var ( nodeUnTaintedPredicate, nodeReadyPredicate, } + etpLocalNodePredicates []NodeConditionPredicate = []NodeConditionPredicate{ + nodeIncludedPredicate, + nodeSchedulablePredicate, + nodeUnTaintedPredicate, + } ) +func getNodePredicatesForService(service *v1.Service) []NodeConditionPredicate { + if service.Spec.ExternalTrafficPolicy == v1.ServiceExternalTrafficPolicyTypeLocal { + return etpLocalNodePredicates + } + return allNodePredicates +} + // We consider the node for load balancing only when the node is not labelled for exclusion. func nodeIncludedPredicate(node *v1.Node) bool { _, hasExcludeBalancerLabel := node.Labels[v1.LabelNodeExcludeBalancers] diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index 69452bb990e..b7877004b20 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -44,6 +44,7 @@ import ( "k8s.io/client-go/util/workqueue" fakecloud "k8s.io/cloud-provider/fake" servicehelper "k8s.io/cloud-provider/service/helpers" + utilpointer "k8s.io/utils/pointer" "github.com/stretchr/testify/assert" @@ -64,6 +65,20 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv } } +func newETPLocalService(name string, serviceType v1.ServiceType) *v1.Service { + return &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: "default", + UID: "777", + }, + Spec: v1.ServiceSpec{ + Type: serviceType, + ExternalTrafficPolicy: v1.ServiceExternalTrafficPolicyTypeLocal, + }, + } +} + // Wrap newService so that you don't have to call default arguments again and again. func defaultExternalService() *v1.Service { return newService("external-balancer", types.UID("123"), v1.ServiceTypeLoadBalancer) @@ -96,6 +111,7 @@ func newController() (*Controller, *fakecloud.Cloud, *fake.Clientset) { nodeListerSynced: nodeInformer.Informer().HasSynced, queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), nodeSyncCh: make(chan interface{}, 1), + lastSyncedNodes: []*v1.Node{}, } balancer, _ := cloud.LoadBalancer() @@ -559,6 +575,193 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { } } +func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) { + node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node2NotReady := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}}} + node2Tainted := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Spec: v1.NodeSpec{Taints: []v1.Taint{{Key: ToBeDeletedTaint}}}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionFalse}}}} + node2Unschedulable := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Spec: v1.NodeSpec{Unschedulable: true}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node2SpuriousChange := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Phase: v1.NodeTerminated, Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node2Exclude := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1", Labels: map[string]string{v1.LabelNodeExcludeBalancers: ""}}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + node3 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node73"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} + + type stateChanges struct { + nodes []*v1.Node + syncCallErr bool + } + + etpLocalservice1 := newETPLocalService("s0", v1.ServiceTypeLoadBalancer) + etpLocalservice2 := newETPLocalService("s1", v1.ServiceTypeLoadBalancer) + service3 := defaultExternalService() + + services := []*v1.Service{etpLocalservice1, etpLocalservice2, service3} + + for _, tc := range []struct { + desc string + expectedUpdateCalls []fakecloud.UpdateBalancerCall + stateChanges []stateChanges + initialState []*v1.Node + }{ + { + desc: "No node changes", + initialState: []*v1.Node{node1, node2, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, + }, + { + desc: "1 new node gets added", + initialState: []*v1.Node{node1, node2}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: etpLocalservice1, Hosts: []*v1.Node{node1, node2, node3}}, + {Service: etpLocalservice2, Hosts: []*v1.Node{node1, node2, node3}}, + {Service: service3, Hosts: []*v1.Node{node1, node2, node3}}, + }, + }, + { + desc: "1 new node gets added - with retries", + initialState: []*v1.Node{node1, node2}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2, node3}, + syncCallErr: true, + }, + { + nodes: []*v1.Node{node1, node2, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: etpLocalservice1, Hosts: []*v1.Node{node1, node2, node3}}, + {Service: etpLocalservice2, Hosts: []*v1.Node{node1, node2, node3}}, + {Service: service3, Hosts: []*v1.Node{node1, node2, node3}}, + }, + }, + { + desc: "1 node goes NotReady", + initialState: []*v1.Node{node1, node2, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2NotReady, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: service3, Hosts: []*v1.Node{node1, node3}}, + }, + }, + { + desc: "1 node gets Tainted", + initialState: []*v1.Node{node1, node2, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2Tainted, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: etpLocalservice1, Hosts: []*v1.Node{node1, node3}}, + {Service: etpLocalservice2, Hosts: []*v1.Node{node1, node3}}, + {Service: service3, Hosts: []*v1.Node{node1, node3}}, + }, + }, + { + desc: "1 node goes unschedulable", + initialState: []*v1.Node{node1, node2, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2Unschedulable, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: etpLocalservice1, Hosts: []*v1.Node{node1, node3}}, + {Service: etpLocalservice2, Hosts: []*v1.Node{node1, node3}}, + {Service: service3, Hosts: []*v1.Node{node1, node3}}, + }, + }, + { + desc: "1 node goes Ready", + initialState: []*v1.Node{node1, node2NotReady, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: service3, Hosts: []*v1.Node{node1, node2, node3}}, + }, + }, + { + desc: "1 node get excluded", + initialState: []*v1.Node{node1, node2, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2Exclude, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: etpLocalservice1, Hosts: []*v1.Node{node1, node3}}, + {Service: etpLocalservice2, Hosts: []*v1.Node{node1, node3}}, + {Service: service3, Hosts: []*v1.Node{node1, node3}}, + }, + }, + { + desc: "1 old node gets deleted", + initialState: []*v1.Node{node1, node2, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{ + {Service: etpLocalservice1, Hosts: []*v1.Node{node1, node2}}, + {Service: etpLocalservice2, Hosts: []*v1.Node{node1, node2}}, + {Service: service3, Hosts: []*v1.Node{node1, node2}}, + }, + }, + { + desc: "1 spurious node update", + initialState: []*v1.Node{node1, node2, node3}, + stateChanges: []stateChanges{ + { + nodes: []*v1.Node{node1, node2SpuriousChange, node3}, + }, + }, + expectedUpdateCalls: []fakecloud.UpdateBalancerCall{}, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + controller, cloud, _ := newController() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + controller.lastSyncedNodes = tc.initialState + + for _, state := range tc.stateChanges { + setupState := func() { + controller.nodeLister = newFakeNodeLister(nil, state.nodes...) + if state.syncCallErr { + cloud.Err = fmt.Errorf("error please") + } + } + cleanupState := func() { + cloud.Err = nil + } + setupState() + controller.updateLoadBalancerHosts(ctx, services, 3) + cleanupState() + } + + compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) + }) + } +} + func TestNodeChangesInExternalLoadBalancer(t *testing.T) { node1 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node0"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} node2 := &v1.Node{ObjectMeta: metav1.ObjectMeta{Name: "node1"}, Status: v1.NodeStatus{Conditions: []v1.NodeCondition{{Type: v1.NodeReady, Status: v1.ConditionTrue}}}} @@ -2782,7 +2985,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { }, }, }, - shouldSync: false, + shouldSync: true, }, { name: "unschedable T, excluded F->T", @@ -2822,7 +3025,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { }, }, }, - shouldSync: false, + shouldSync: true, }, { name: "unschedable T, ready F->T", @@ -3034,7 +3237,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { }, }, }, - shouldSync: false, + shouldSync: true, }, { name: "tainted T, excluded T->F", @@ -3082,7 +3285,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { }, }, }, - shouldSync: false, + shouldSync: true, }, { name: "tainted T, ready F->T", @@ -3604,7 +3807,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { }, }, }, - shouldSync: false, + shouldSync: true, }, { name: "ready F, excluded T->F", @@ -3638,7 +3841,7 @@ func Test_shouldSyncUpdatedNode_compoundedPredicates(t *testing.T) { }, }, }, - shouldSync: false, + shouldSync: true, }, } for _, testcase := range testcases {