diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go index 5a6a5079f82..27cd0f051df 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller.go @@ -93,10 +93,10 @@ type Controller struct { serviceQueue workqueue.RateLimitingInterface nodeQueue workqueue.RateLimitingInterface // lastSyncedNodes is used when reconciling node state and keeps track of - // the last synced set of nodes. This field is concurrently safe because the - // nodeQueue is serviced by only one go-routine, so node events are not - // processed concurrently. - lastSyncedNodes []*v1.Node + // the last synced set of nodes per service key. This is accessed from the + // service and node controllers, hence it is protected by a lock. + lastSyncedNodes map[string][]*v1.Node + lastSyncedNodesLock sync.Mutex } // New returns a new service controller to keep cloud provider service resources @@ -124,7 +124,7 @@ func New( nodeListerSynced: nodeInformer.Informer().HasSynced, serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + lastSyncedNodes: make(map[string][]*v1.Node), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -439,16 +439,32 @@ func (c *Controller) ensureLoadBalancer(ctx context.Context, service *v1.Service if err != nil { return nil, err } - // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(nodes) == 0 { c.eventRecorder.Event(service, v1.EventTypeWarning, "UnAvailableLoadBalancer", "There are no available nodes for LoadBalancer") } - - // - Only one protocol supported per service // - Not all cloud providers support all protocols and the next step is expected to return // an error for unsupported protocols - return c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) + status, err := c.balancer.EnsureLoadBalancer(ctx, c.clusterName, service, nodes) + if err != nil { + return nil, err + } + c.storeLastSyncedNodes(service, nodes) + return status, nil +} + +func (c *Controller) storeLastSyncedNodes(svc *v1.Service, nodes []*v1.Node) { + c.lastSyncedNodesLock.Lock() + defer c.lastSyncedNodesLock.Unlock() + key, _ := cache.MetaNamespaceKeyFunc(svc) + c.lastSyncedNodes[key] = nodes +} + +func (c *Controller) getLastSyncedNodes(svc *v1.Service) []*v1.Node { + c.lastSyncedNodesLock.Lock() + defer c.lastSyncedNodesLock.Unlock() + key, _ := cache.MetaNamespaceKeyFunc(svc) + return c.lastSyncedNodes[key] } // ListKeys implements the interface required by DeltaFIFO to list the keys we @@ -662,15 +678,6 @@ func portEqualForLB(x, y *v1.ServicePort) bool { return true } -func serviceKeys(services []*v1.Service) sets.String { - ret := sets.NewString() - for _, service := range services { - key, _ := cache.MetaNamespaceKeyFunc(service) - ret.Insert(key) - } - return ret -} - func nodeNames(nodes []*v1.Node) sets.String { ret := sets.NewString() for _, node := range nodes { @@ -737,19 +744,29 @@ func (c *Controller) syncNodes(ctx context.Context, workers int) sets.String { // load balancers and finished doing it successfully, or didn't try to at all because // there's no need. This function returns true if we tried to update load balancers and // failed, indicating to the caller that we should try again. -func (c *Controller) nodeSyncService(svc *v1.Service, oldNodes, newNodes []*v1.Node) bool { +func (c *Controller) nodeSyncService(svc *v1.Service) bool { const retSuccess = false const retNeedRetry = true if svc == nil || !wantsLoadBalancer(svc) { return retSuccess } + newNodes, err := listWithPredicates(c.nodeLister) + if err != nil { + runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) + nodeSyncErrorCount.Inc() + return retNeedRetry + } newNodes = filterWithPredicates(newNodes, getNodePredicatesForService(svc)...) - oldNodes = filterWithPredicates(oldNodes, getNodePredicatesForService(svc)...) - + oldNodes := filterWithPredicates(c.getLastSyncedNodes(svc), getNodePredicatesForService(svc)...) + // Store last synced nodes without actually determining if we successfully + // synced them or not. Failed node syncs are passed off to retries in the + // service queue, so no need to wait. If we don't store it now, we risk + // re-syncing all LBs twice, one from another sync in the node sync and + // from the service sync + c.storeLastSyncedNodes(svc, newNodes) if nodesSufficientlyEqual(oldNodes, newNodes) { return retSuccess } - klog.V(4).Infof("nodeSyncService started for service %s/%s", svc.Namespace, svc.Name) if err := c.lockedUpdateLoadBalancerHosts(svc, newNodes); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", svc.Namespace, svc.Name, err)) @@ -794,20 +811,12 @@ func nodesSufficientlyEqual(oldNodes, newNodes []*v1.Node) bool { func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1.Service, workers int) (servicesToRetry sets.String) { klog.V(4).Infof("Running updateLoadBalancerHosts(len(services)==%d, workers==%d)", len(services), workers) - // Include all nodes and let nodeSyncService filter and figure out if - // the update is relevant for the service in question. - nodes, err := listWithPredicates(c.nodeLister) - if err != nil { - runtime.HandleError(fmt.Errorf("failed to retrieve node list: %v", err)) - return serviceKeys(services) - } - // lock for servicesToRetry servicesToRetry = sets.NewString() lock := sync.Mutex{} doWork := func(piece int) { - if shouldRetry := c.nodeSyncService(services[piece], c.lastSyncedNodes, nodes); !shouldRetry { + if shouldRetry := c.nodeSyncService(services[piece]); !shouldRetry { return } lock.Lock() @@ -816,7 +825,6 @@ func (c *Controller) updateLoadBalancerHosts(ctx context.Context, services []*v1 servicesToRetry.Insert(key) } workqueue.ParallelizeUntil(ctx, workers, len(services), doWork) - c.lastSyncedNodes = nodes klog.V(4).Infof("Finished updateLoadBalancerHosts") return servicesToRetry } diff --git a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go index bcc902f4930..157a6676746 100644 --- a/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go +++ b/staging/src/k8s.io/cloud-provider/controllers/service/controller_test.go @@ -177,7 +177,7 @@ func newController(stopCh <-chan struct{}, objects ...runtime.Object) (*Controll nodeListerSynced: nodeInformer.Informer().HasSynced, serviceQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), nodeQueue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "node"), - lastSyncedNodes: []*v1.Node{}, + lastSyncedNodes: make(map[string][]*v1.Node), } informerFactory.Start(stopCh) @@ -609,7 +609,10 @@ func TestNodeChangesForExternalTrafficPolicyLocalServices(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller.lastSyncedNodes = tc.initialState + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.initialState + } for _, state := range tc.stateChanges { setupState := func() { @@ -782,7 +785,10 @@ func TestNodeChangesForStableNodeSetEnabled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - controller.lastSyncedNodes = tc.initialState + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.initialState + } for _, state := range tc.stateChanges { setupState := func() { @@ -989,7 +995,10 @@ func TestNodesNotEqual(t *testing.T) { defer cancel() controller.nodeLister = newFakeNodeLister(nil, tc.newNodes...) - controller.lastSyncedNodes = tc.lastSyncNodes + for _, svc := range services { + key, _ := cache.MetaNamespaceKeyFunc(svc) + controller.lastSyncedNodes[key] = tc.lastSyncNodes + } controller.updateLoadBalancerHosts(ctx, services, 5) compareUpdateCalls(t, tc.expectedUpdateCalls, cloud.UpdateCalls) @@ -1420,9 +1429,8 @@ func TestSlowNodeSync(t *testing.T) { {Service: service1, Hosts: []*v1.Node{node1, node2}}, // Second update call for impacted service from controller.syncService {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, - // Third update call for second service from controller.syncNodes. Here - // is the problem: this update call removes the previously added node3. - {Service: service2, Hosts: []*v1.Node{node1, node2}}, + // Third update call for second service from controller.syncNodes. + {Service: service2, Hosts: []*v1.Node{node1, node2, node3}}, } wg := sync.WaitGroup{}