From c8a12b8045c25b37e66a3c46b93939fe3a63d5c7 Mon Sep 17 00:00:00 2001 From: Josh Horwitz Date: Wed, 8 Nov 2017 14:45:24 -0500 Subject: [PATCH 1/2] Fixes service controller update race condition --- pkg/controller/service/service_controller.go | 72 ++++---- .../service/service_controller_test.go | 161 ++++++++++++++---- 2 files changed, 161 insertions(+), 72 deletions(-) diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index 6e1a350539d..27e6b5643cb 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -89,7 +89,6 @@ type serviceCache struct { type ServiceController struct { cloud cloudprovider.Interface knownHosts []*v1.Node - servicesToUpdate []*v1.Service kubeClient clientset.Interface clusterName string balancer cloudprovider.LoadBalancer @@ -241,6 +240,20 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s } } } + + if cachedService.state != nil { + if !s.needsUpdate(cachedService.state, service) { + // The service does not require an update which means it was placed on the work queue + // by the node sync loop and indicates that the hosts need to be updated. + err := s.updateLoadBalancerHosts(service) + if err != nil { + return err, cachedService.nextRetryDelay() + } + cachedService.resetRetryDelay() + return nil, doNotRetry + } + } + // cache the service, we need the info for service deletion cachedService.state = service err, retry := s.createLoadBalancerIfNeeded(key, service) @@ -435,6 +448,8 @@ func (s *serviceCache) delete(serviceName string) { delete(s.serviceMap, serviceName) } +// needsUpdate checks to see if there were any changes between the old and new service that would require a load balancer update. +// This method does not and should not check if the hosts have changed. func (s *ServiceController) needsUpdate(oldService *v1.Service, newService *v1.Service) bool { if !wantsLoadBalancer(oldService) && !wantsLoadBalancer(newService) { return false @@ -637,62 +652,45 @@ func getNodeConditionPredicate() corelisters.NodeConditionPredicate { } } -// nodeSyncLoop handles updating the hosts pointed to by all load -// balancers whenever the set of nodes in the cluster changes. +// nodeSyncLoop handles adding all existing cached services to the work queue +// to be reprocessed so that they can have their hosts updated, if any +// host changes have occurred since the last sync loop. func (s *ServiceController) nodeSyncLoop() { newHosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) if err != nil { glog.Errorf("Failed to retrieve current set of nodes from node lister: %v", err) return } + if nodeSlicesEqualForLB(newHosts, s.knownHosts) { - // The set of nodes in the cluster hasn't changed, but we can retry - // updating any services that we failed to update last time around. - s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) + // Nothing to do since the hosts have not changed. return } - glog.Infof("Detected change in list of current cluster nodes. New node set: %v", - nodeNames(newHosts)) + glog.Infof("Detected change in list of current cluster nodes. New node set: %v", nodeNames(newHosts)) - // Try updating all services, and save the ones that fail to try again next - // round. - s.servicesToUpdate = s.cache.allServices() - numServices := len(s.servicesToUpdate) - s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) - glog.Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", - numServices-len(s.servicesToUpdate), numServices) + for _, svc := range s.cache.allServices() { + s.enqueueService(svc) + } + // Update the known hosts so we can check next sync loop for changes. s.knownHosts = newHosts } -// updateLoadBalancerHosts updates all existing load balancers so that -// they will match the list of hosts provided. -// Returns the list of services that couldn't be updated. -func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { - for _, service := range services { - func() { - if service == nil { - return - } - if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { - glog.Errorf("External error while updating load balancer: %v.", err) - servicesToRetry = append(servicesToRetry, service) - } - }() - } - return servicesToRetry -} - -// Updates the load balancer of a service, assuming we hold the mutex -// associated with the service. -func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { +// Updates the load balancer of the service with updated nodes ONLY. +// This method will not trigger the cloud provider to create or full update a load balancer. +func (s *ServiceController) updateLoadBalancerHosts(service *v1.Service) error { if !wantsLoadBalancer(service) { return nil } + hosts, err := s.nodeLister.ListWithPredicate(getNodeConditionPredicate()) + if err != nil { + return err + } + // This operation doesn't normally take very long (and happens pretty often), so we only record the final event - err := s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) + err = s.balancer.UpdateLoadBalancer(s.clusterName, service, hosts) if err == nil { // If there are no available nodes for LoadBalancer service, make a EventTypeWarning event for it. if len(hosts) == 0 { diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index be5e8ddde2d..a0be7c97fc5 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -19,6 +19,7 @@ package service import ( "fmt" "reflect" + "sort" "testing" "time" @@ -27,6 +28,8 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes/fake" + corelisters "k8s.io/client-go/listers/core/v1" + "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/record" "k8s.io/kubernetes/pkg/api/testapi" fakecloud "k8s.io/kubernetes/pkg/cloudprovider/providers/fake" @@ -174,23 +177,45 @@ func TestCreateExternalLoadBalancer(t *testing.T) { } } +// newLoadBalancerNode returns a node that passes the predicate check for a +// node to receive load balancer traffic. +func newLoadBalancerNode(name string) *v1.Node { + return &v1.Node{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + }, + Spec: v1.NodeSpec{ + Unschedulable: false, + }, + Status: v1.NodeStatus{ + Conditions: []v1.NodeCondition{ + {Type: v1.NodeReady, Status: v1.ConditionTrue}, + }, + }, + } +} + +func sortNodesByName(nodes []*v1.Node) { + sort.Slice(nodes, func(i, j int) bool { + return nodes[i].Name < nodes[j].Name + }) +} + // TODO: Finish converting and update comments func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { + nodes := []*v1.Node{ - {ObjectMeta: metav1.ObjectMeta{Name: "node0"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "node1"}}, - {ObjectMeta: metav1.ObjectMeta{Name: "node73"}}, + newLoadBalancerNode("node0"), + newLoadBalancerNode("node1"), + newLoadBalancerNode("node73"), } - table := []struct { + sortNodesByName(nodes) + + table := map[string]struct { services []*v1.Service expectedUpdateCalls []fakecloud.FakeUpdateBalancerCall }{ - { - // No services present: no calls should be made. - services: []*v1.Service{}, - expectedUpdateCalls: nil, - }, - { + "update no load balancer": { // Services do not have external load balancers: no calls should be made. services: []*v1.Service{ newService("s0", "111", v1.ServiceTypeClusterIP), @@ -198,7 +223,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { }, expectedUpdateCalls: nil, }, - { + "update 1 load balancer": { // Services does have an external load balancer: one call should be made. services: []*v1.Service{ newService("s0", "333", v1.ServiceTypeLoadBalancer), @@ -207,7 +232,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s0", "333", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - { + "update 3 load balancers": { // Three services have an external load balancer: three calls. services: []*v1.Service{ newService("s0", "444", v1.ServiceTypeLoadBalancer), @@ -220,7 +245,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s2", "666", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - { + "update 2 load balancers": { // Two services have an external load balancer and two don't: two calls. services: []*v1.Service{ newService("s0", "777", v1.ServiceTypeNodePort), @@ -233,30 +258,44 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s3", "999", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - { - // One service has an external load balancer and one is nil: one call. - services: []*v1.Service{ - newService("s0", "234", v1.ServiceTypeLoadBalancer), - nil, - }, - expectedUpdateCalls: []fakecloud.FakeUpdateBalancerCall{ - {Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, - }, - }, } - for _, item := range table { - controller, cloud, _ := newController() - var services []*v1.Service - for _, service := range item.services { - services = append(services, service) - } - if err := controller.updateLoadBalancerHosts(services, nodes); err != nil { - t.Errorf("unexpected error: %v", err) - } - if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { - t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) - } + for name, item := range table { + t.Run(name, func(t *testing.T) { + controller, cloud, _ := newController() + + var services []*v1.Service + for _, service := range item.services { + services = append(services, service) + } + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + for _, node := range nodes { + nodeIndexer.Add(node) + } + controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) + + for _, service := range services { + if err := controller.updateLoadBalancerHosts(service); err != nil { + t.Errorf("unexpected error: %v", err) + } + } + + if len(item.expectedUpdateCalls) != len(cloud.UpdateCalls) { + t.Errorf("expected %d update calls but only got %d", len(item.expectedUpdateCalls), len(cloud.UpdateCalls)) + } + + for i, expectedCall := range item.expectedUpdateCalls { + actualCall := cloud.UpdateCalls[i] + if !reflect.DeepEqual(expectedCall.Service, actualCall.Service) { + t.Errorf("expected update call to contain service %+v, got %+v", expectedCall.Service, actualCall.Service) + } + + sortNodesByName(actualCall.Hosts) + if !reflect.DeepEqual(expectedCall.Hosts, actualCall.Hosts) { + t.Errorf("expected update call to contain hosts %+v, got %+v", expectedCall.Hosts, actualCall.Hosts) + } + } + }) } } @@ -311,6 +350,13 @@ func TestProcessServiceUpdate(t *testing.T) { var controller *ServiceController var cloud *fakecloud.FakeCloud + nodes := []*v1.Node{ + newLoadBalancerNode("node0"), + newLoadBalancerNode("node1"), + newLoadBalancerNode("node73"), + } + sortNodesByName(nodes) + //A pair of old and new loadbalancer IP address oldLBIP := "192.168.1.1" newLBIP := "192.168.1.11" @@ -344,6 +390,51 @@ func TestProcessServiceUpdate(t *testing.T) { return nil }, }, + { + testName: "If updating hosts only", + key: "default/sync-test-name", + svc: newService("sync-test-name", types.UID("sync-test-uid"), v1.ServiceTypeLoadBalancer), + updateFn: func(svc *v1.Service) *v1.Service { + keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName() + cachedServiceTest := controller.cache.getOrCreate(keyExpected) + cachedServiceTest.state = svc + controller.cache.set(keyExpected, cachedServiceTest) + + // Set the nodes for the cloud's UpdateLoadBalancer call to use. + nodeIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{}) + for _, node := range nodes { + nodeIndexer.Add(node) + } + controller.nodeLister = corelisters.NewNodeLister(nodeIndexer) + + // This should trigger the needsUpdate false check since the service equals the cached service + return svc + }, + expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error { + if err != nil { + return err + } + if retryDuration != doNotRetry { + return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) + } + + if len(cloud.UpdateCalls) != 1 { + return fmt.Errorf("expected one update host call but only got %+v", cloud.UpdateCalls) + } + + actualCall := cloud.UpdateCalls[0] + if !reflect.DeepEqual(svc, actualCall.Service) { + return fmt.Errorf("expected update call to contain service %+v, got %+v", svc, actualCall.Service) + } + + sortNodesByName(actualCall.Hosts) + if !reflect.DeepEqual(nodes, actualCall.Hosts) { + return fmt.Errorf("expected update call to contain hosts %+v, got %+v", nodes, actualCall.Hosts) + } + + return nil + }, + }, { testName: "If Updating Loadbalancer IP", key: "default/sync-test-name", From 26f9dd7b3e66f10b47163f88eac00ebc33ae0957 Mon Sep 17 00:00:00 2001 From: Josh Horwitz Date: Wed, 8 Nov 2017 14:46:25 -0500 Subject: [PATCH 2/2] generated files --- pkg/controller/service/BUILD | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/controller/service/BUILD b/pkg/controller/service/BUILD index 11766c49ca0..67d27cdf8f6 100644 --- a/pkg/controller/service/BUILD +++ b/pkg/controller/service/BUILD @@ -50,6 +50,8 @@ go_test( "//vendor/k8s.io/apimachinery/pkg/types:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/listers/core/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/client-go/tools/record:go_default_library", ], )