From c8c88cefadb035fc92809fc70d5d44f7777bf761 Mon Sep 17 00:00:00 2001 From: Ernest Wong Date: Tue, 17 Sep 2019 15:24:28 -0700 Subject: [PATCH] Update service controller to prevent orphaned public IP addresses --- pkg/controller/service/service_controller.go | 44 +++++----- .../service/service_controller_test.go | 85 ++++++++++++++----- 2 files changed, 87 insertions(+), 42 deletions(-) diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index e32a4c45326..c01ccbd9d3e 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -92,6 +92,8 @@ const ( ) type cachedService struct { + // Protects cached service state + mu sync.Mutex // The cached state of the service state *v1.Service } @@ -106,7 +108,7 @@ type serviceCache struct { type ServiceController struct { cloud cloudprovider.Interface knownHosts []*v1.Node - servicesToUpdate []*v1.Service + servicesToUpdate []string kubeClient clientset.Interface clusterName string balancer cloudprovider.LoadBalancer @@ -277,6 +279,8 @@ func (s *ServiceController) processServiceCreateOrUpdate(service *v1.Service, ke // TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion // path. Ref https://github.com/kubernetes/enhancements/issues/980. cachedService := s.cache.getOrCreate(key) + cachedService.mu.Lock() + defer cachedService.mu.Unlock() if cachedService.state != nil && cachedService.state.UID != service.UID { // This happens only when a service is deleted and re-created // in a short period, which is only possible when it doesn't @@ -329,8 +333,10 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key st if err != nil { return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err) } - if exists { - klog.V(2).Infof("Deleting existing load balancer for service %s", key) + // Always call EnsureLoadBalancerDeleted against load balancers to + // ensure the underlying components are completely deleted + if exists || service.Spec.Type == v1.ServiceTypeLoadBalancer { + klog.V(2).Infof("Deleting load balancer for service %s", key) s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil { return op, fmt.Errorf("failed to delete load balancer: %v", err) @@ -425,18 +431,6 @@ func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) { return nil, false, nil } -// ListKeys implements the interface required by DeltaFIFO to list the keys we -// already know about. -func (s *serviceCache) allServices() []*v1.Service { - s.mu.RLock() - defer s.mu.RUnlock() - services := make([]*v1.Service, 0, len(s.serviceMap)) - for _, v := range s.serviceMap { - services = append(services, v.state) - } - return services -} - func (s *serviceCache) get(serviceName string) (*cachedService, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -692,7 +686,7 @@ func (s *ServiceController) nodeSyncLoop() { // Try updating all services, and save the ones that fail to try again next // round. - s.servicesToUpdate = s.cache.allServices() + s.servicesToUpdate = s.cache.ListKeys() numServices := len(s.servicesToUpdate) s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", @@ -704,15 +698,21 @@ func (s *ServiceController) nodeSyncLoop() { // updateLoadBalancerHosts updates all existing load balancers so that // they will match the list of hosts provided. // Returns the list of services that couldn't be updated. -func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { - for _, service := range services { +func (s *ServiceController) updateLoadBalancerHosts(keys []string, hosts []*v1.Node) (servicesToRetry []string) { + for _, key := range keys { func() { - if service == nil { + cachedService, ok := s.cache.get(key) + // Check if we already deleted the load balancer that was created for the service + if !ok { return } + + cachedService.mu.Lock() + defer cachedService.mu.Unlock() + service := cachedService.state if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err)) - servicesToRetry = append(servicesToRetry, service) + servicesToRetry = append(servicesToRetry, key) } }() } @@ -722,7 +722,7 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, host // Updates the load balancer of a service, assuming we hold the mutex // associated with the service. func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { - if !wantsLoadBalancer(service) { + if needsCleanup(service) { return nil } @@ -800,6 +800,8 @@ func (s *ServiceController) processServiceDeletion(key string) error { // In both cases we have nothing left to do. return nil } + cachedService.mu.Lock() + defer cachedService.mu.Unlock() klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key) if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil { return err diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index 93d8c482889..a251af59119 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -49,10 +49,11 @@ const region = "us-central" func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - UID: uid, - SelfLink: testapi.Default.SelfLink("services", name), + Name: name, + Namespace: "default", + UID: uid, + SelfLink: testapi.Default.SelfLink("services", name), + Finalizers: []string{servicehelper.LoadBalancerCleanupFinalizer}, }, Spec: v1.ServiceSpec{ Type: serviceType, @@ -60,6 +61,12 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv } } +func newServiceWithDeletionTimestamp(name string, uid types.UID, serviceType v1.ServiceType, deletionTimestamp metav1.Time) *v1.Service { + service := newService(name, uid, serviceType) + service.ObjectMeta.DeletionTimestamp = &deletionTimestamp + return service +} + //Wrap newService so that you don't have to call default arguments again and again. func defaultExternalService() *v1.Service { return newService("external-balancer", types.UID("123"), v1.ServiceTypeLoadBalancer) @@ -303,6 +310,30 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { expectPatchStatus: true, expectPatchFinalizer: false, }, + { + desc: "service that needs cleanup but LB does not exist", + service: &v1.Service{ + ObjectMeta: metav1.ObjectMeta{ + Name: "basic-service1", + Namespace: "default", + SelfLink: testapi.Default.SelfLink("services", "basic-service1"), + Finalizers: []string{servicehelper.LoadBalancerCleanupFinalizer}, + DeletionTimestamp: &metav1.Time{ + Time: time.Now(), + }, + }, + Spec: v1.ServiceSpec{ + Ports: []v1.ServicePort{{ + Port: 80, + Protocol: v1.ProtocolTCP, + }}, + Type: v1.ServiceTypeLoadBalancer, + }, + }, + expectOp: deleteLoadBalancer, + expectDeleteAttempt: true, + expectPatchStatus: true, + }, } for _, tc := range testCases { @@ -408,6 +439,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { table := []struct { services []*v1.Service expectedUpdateCalls []fakecloud.UpdateBalancerCall + shouldNotBeInCache bool }{ { // No services present: no calls should be made. @@ -467,16 +499,39 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, + { + // One service has an external load balancer but is not in the cache (it is deleted): no calls should be made. + services: []*v1.Service{ + newService("s0", "567", v1.ServiceTypeLoadBalancer), + }, + expectedUpdateCalls: nil, + shouldNotBeInCache: true, + }, + { + // One service has a deleteion timestamp: no calls should be made. + services: []*v1.Service{ + newServiceWithDeletionTimestamp("s0", "999", v1.ServiceTypeLoadBalancer, metav1.Now()), + }, + expectedUpdateCalls: nil, + }, } for _, item := range table { controller, cloud, _ := newController() - var services []*v1.Service - services = append(services, item.services...) - - if err := controller.updateLoadBalancerHosts(services, nodes); err != nil { - t.Errorf("unexpected error: %v", err) + // Use keys array instead of cache.ListKeys() so that the order of keys is deterministic + keys := make([]string, 0, len(item.services)) + for _, service := range item.services { + if service == nil || item.shouldNotBeInCache { + continue + } + key := service.GetNamespace() + "/" + service.GetName() + keys = append(keys, key) + // Manually populate cache because updateLoadBalancerHosts gets services from cache + svc := controller.cache.getOrCreate(key) + svc.state = service } + + controller.updateLoadBalancerHosts(keys, nodes) if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) } @@ -1143,18 +1198,6 @@ func TestServiceCache(t *testing.T) { return nil }, }, - { - testName: "allServices", - setCacheFn: nil, //Nothing to set - checkCacheFn: func() error { - //It should return two elements - svcArray := sc.allServices() - if len(svcArray) != 2 { - return fmt.Errorf("Expected(2) Obtained(%v)", len(svcArray)) - } - return nil - }, - }, } for _, tc := range testCases {