diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index c01ccbd9d3e..e32a4c45326 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -92,8 +92,6 @@ const ( ) type cachedService struct { - // Protects cached service state - mu sync.Mutex // The cached state of the service state *v1.Service } @@ -108,7 +106,7 @@ type serviceCache struct { type ServiceController struct { cloud cloudprovider.Interface knownHosts []*v1.Node - servicesToUpdate []string + servicesToUpdate []*v1.Service kubeClient clientset.Interface clusterName string balancer cloudprovider.LoadBalancer @@ -279,8 +277,6 @@ func (s *ServiceController) processServiceCreateOrUpdate(service *v1.Service, ke // TODO(@MrHohn): Remove the cache once we get rid of the non-finalizer deletion // path. Ref https://github.com/kubernetes/enhancements/issues/980. cachedService := s.cache.getOrCreate(key) - cachedService.mu.Lock() - defer cachedService.mu.Unlock() if cachedService.state != nil && cachedService.state.UID != service.UID { // This happens only when a service is deleted and re-created // in a short period, which is only possible when it doesn't @@ -333,10 +329,8 @@ func (s *ServiceController) syncLoadBalancerIfNeeded(service *v1.Service, key st if err != nil { return op, fmt.Errorf("failed to check if load balancer exists before cleanup: %v", err) } - // Always call EnsureLoadBalancerDeleted against load balancers to - // ensure the underlying components are completely deleted - if exists || service.Spec.Type == v1.ServiceTypeLoadBalancer { - klog.V(2).Infof("Deleting load balancer for service %s", key) + if exists { + klog.V(2).Infof("Deleting existing load balancer for service %s", key) s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") if err := s.balancer.EnsureLoadBalancerDeleted(context.TODO(), s.clusterName, service); err != nil { return op, fmt.Errorf("failed to delete load balancer: %v", err) @@ -431,6 +425,18 @@ func (s *serviceCache) GetByKey(key string) (interface{}, bool, error) { return nil, false, nil } +// ListKeys implements the interface required by DeltaFIFO to list the keys we +// already know about. +func (s *serviceCache) allServices() []*v1.Service { + s.mu.RLock() + defer s.mu.RUnlock() + services := make([]*v1.Service, 0, len(s.serviceMap)) + for _, v := range s.serviceMap { + services = append(services, v.state) + } + return services +} + func (s *serviceCache) get(serviceName string) (*cachedService, bool) { s.mu.RLock() defer s.mu.RUnlock() @@ -686,7 +692,7 @@ func (s *ServiceController) nodeSyncLoop() { // Try updating all services, and save the ones that fail to try again next // round. - s.servicesToUpdate = s.cache.ListKeys() + s.servicesToUpdate = s.cache.allServices() numServices := len(s.servicesToUpdate) s.servicesToUpdate = s.updateLoadBalancerHosts(s.servicesToUpdate, newHosts) klog.V(2).Infof("Successfully updated %d out of %d load balancers to direct traffic to the updated set of nodes", @@ -698,21 +704,15 @@ func (s *ServiceController) nodeSyncLoop() { // updateLoadBalancerHosts updates all existing load balancers so that // they will match the list of hosts provided. // Returns the list of services that couldn't be updated. -func (s *ServiceController) updateLoadBalancerHosts(keys []string, hosts []*v1.Node) (servicesToRetry []string) { - for _, key := range keys { +func (s *ServiceController) updateLoadBalancerHosts(services []*v1.Service, hosts []*v1.Node) (servicesToRetry []*v1.Service) { + for _, service := range services { func() { - cachedService, ok := s.cache.get(key) - // Check if we already deleted the load balancer that was created for the service - if !ok { + if service == nil { return } - - cachedService.mu.Lock() - defer cachedService.mu.Unlock() - service := cachedService.state if err := s.lockedUpdateLoadBalancerHosts(service, hosts); err != nil { runtime.HandleError(fmt.Errorf("failed to update load balancer hosts for service %s/%s: %v", service.Namespace, service.Name, err)) - servicesToRetry = append(servicesToRetry, key) + servicesToRetry = append(servicesToRetry, service) } }() } @@ -722,7 +722,7 @@ func (s *ServiceController) updateLoadBalancerHosts(keys []string, hosts []*v1.N // Updates the load balancer of a service, assuming we hold the mutex // associated with the service. func (s *ServiceController) lockedUpdateLoadBalancerHosts(service *v1.Service, hosts []*v1.Node) error { - if needsCleanup(service) { + if !wantsLoadBalancer(service) { return nil } @@ -800,8 +800,6 @@ func (s *ServiceController) processServiceDeletion(key string) error { // In both cases we have nothing left to do. return nil } - cachedService.mu.Lock() - defer cachedService.mu.Unlock() klog.V(2).Infof("Service %v has been deleted. Attempting to cleanup load balancer resources", key) if err := s.processLoadBalancerDelete(cachedService.state, key); err != nil { return err diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index a251af59119..93d8c482889 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -49,11 +49,10 @@ const region = "us-central" func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Service { return &v1.Service{ ObjectMeta: metav1.ObjectMeta{ - Name: name, - Namespace: "default", - UID: uid, - SelfLink: testapi.Default.SelfLink("services", name), - Finalizers: []string{servicehelper.LoadBalancerCleanupFinalizer}, + Name: name, + Namespace: "default", + UID: uid, + SelfLink: testapi.Default.SelfLink("services", name), }, Spec: v1.ServiceSpec{ Type: serviceType, @@ -61,12 +60,6 @@ func newService(name string, uid types.UID, serviceType v1.ServiceType) *v1.Serv } } -func newServiceWithDeletionTimestamp(name string, uid types.UID, serviceType v1.ServiceType, deletionTimestamp metav1.Time) *v1.Service { - service := newService(name, uid, serviceType) - service.ObjectMeta.DeletionTimestamp = &deletionTimestamp - return service -} - //Wrap newService so that you don't have to call default arguments again and again. func defaultExternalService() *v1.Service { return newService("external-balancer", types.UID("123"), v1.ServiceTypeLoadBalancer) @@ -310,30 +303,6 @@ func TestSyncLoadBalancerIfNeeded(t *testing.T) { expectPatchStatus: true, expectPatchFinalizer: false, }, - { - desc: "service that needs cleanup but LB does not exist", - service: &v1.Service{ - ObjectMeta: metav1.ObjectMeta{ - Name: "basic-service1", - Namespace: "default", - SelfLink: testapi.Default.SelfLink("services", "basic-service1"), - Finalizers: []string{servicehelper.LoadBalancerCleanupFinalizer}, - DeletionTimestamp: &metav1.Time{ - Time: time.Now(), - }, - }, - Spec: v1.ServiceSpec{ - Ports: []v1.ServicePort{{ - Port: 80, - Protocol: v1.ProtocolTCP, - }}, - Type: v1.ServiceTypeLoadBalancer, - }, - }, - expectOp: deleteLoadBalancer, - expectDeleteAttempt: true, - expectPatchStatus: true, - }, } for _, tc := range testCases { @@ -439,7 +408,6 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { table := []struct { services []*v1.Service expectedUpdateCalls []fakecloud.UpdateBalancerCall - shouldNotBeInCache bool }{ { // No services present: no calls should be made. @@ -499,39 +467,16 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { {Service: newService("s0", "234", v1.ServiceTypeLoadBalancer), Hosts: nodes}, }, }, - { - // One service has an external load balancer but is not in the cache (it is deleted): no calls should be made. - services: []*v1.Service{ - newService("s0", "567", v1.ServiceTypeLoadBalancer), - }, - expectedUpdateCalls: nil, - shouldNotBeInCache: true, - }, - { - // One service has a deleteion timestamp: no calls should be made. - services: []*v1.Service{ - newServiceWithDeletionTimestamp("s0", "999", v1.ServiceTypeLoadBalancer, metav1.Now()), - }, - expectedUpdateCalls: nil, - }, } for _, item := range table { controller, cloud, _ := newController() - // Use keys array instead of cache.ListKeys() so that the order of keys is deterministic - keys := make([]string, 0, len(item.services)) - for _, service := range item.services { - if service == nil || item.shouldNotBeInCache { - continue - } - key := service.GetNamespace() + "/" + service.GetName() - keys = append(keys, key) - // Manually populate cache because updateLoadBalancerHosts gets services from cache - svc := controller.cache.getOrCreate(key) - svc.state = service - } + var services []*v1.Service + services = append(services, item.services...) - controller.updateLoadBalancerHosts(keys, nodes) + if err := controller.updateLoadBalancerHosts(services, nodes); err != nil { + t.Errorf("unexpected error: %v", err) + } if !reflect.DeepEqual(item.expectedUpdateCalls, cloud.UpdateCalls) { t.Errorf("expected update calls mismatch, expected %+v, got %+v", item.expectedUpdateCalls, cloud.UpdateCalls) } @@ -1198,6 +1143,18 @@ func TestServiceCache(t *testing.T) { return nil }, }, + { + testName: "allServices", + setCacheFn: nil, //Nothing to set + checkCacheFn: func() error { + //It should return two elements + svcArray := sc.allServices() + if len(svcArray) != 2 { + return fmt.Errorf("Expected(2) Obtained(%v)", len(svcArray)) + } + return nil + }, + }, } for _, tc := range testCases { diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go index dcbde5d78c0..05aa5adfa4d 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer.go @@ -104,18 +104,38 @@ const ( clusterNameKey = "kubernetes-cluster-name" ) -// GetLoadBalancer returns whether the specified load balancer exists, and +// GetLoadBalancer returns whether the specified load balancer and its components exist, and // if so, what its status is. func (az *Cloud) GetLoadBalancer(ctx context.Context, clusterName string, service *v1.Service) (status *v1.LoadBalancerStatus, exists bool, err error) { - _, status, exists, err = az.getServiceLoadBalancer(service, clusterName, nil, false) + // Since public IP is not a part of the load balancer on Azure, + // there is a chance that we could orphan public IP resources while we delete the load blanacer (kubernetes/kubernetes#80571). + // We need to make sure the existence of the load balancer depends on the load balancer resource and public IP resource on Azure. + existsPip := func() bool { + pipName, _, err := az.determinePublicIPName(clusterName, service) + if err != nil { + return false + } + pipResourceGroup := az.getPublicIPAddressResourceGroup(service) + _, existsPip, err := az.getPublicIPAddress(pipResourceGroup, pipName) + if err != nil { + return false + } + return existsPip + }() + + _, status, existsLb, err := az.getServiceLoadBalancer(service, clusterName, nil, false) if err != nil { - return nil, false, err + return nil, existsPip, err } - if !exists { + + // Return exists = false only if the load balancer and the public IP are not found on Azure + if !existsLb && !existsPip { serviceName := getServiceName(service) klog.V(5).Infof("getloadbalancer (cluster:%s) (service:%s) - doesn't exist", clusterName, serviceName) return nil, false, nil } + + // Return exists = true if either the load balancer or the public IP (or both) exists return status, true, nil } @@ -169,6 +189,10 @@ func (az *Cloud) EnsureLoadBalancer(ctx context.Context, clusterName string, ser // UpdateLoadBalancer updates hosts under the specified load balancer. func (az *Cloud) UpdateLoadBalancer(ctx context.Context, clusterName string, service *v1.Service, nodes []*v1.Node) error { + if !az.shouldUpdateLoadBalancer(clusterName, service) { + klog.V(2).Infof("UpdateLoadBalancer: skipping service %s because it is either being deleted or does not exist anymore", service.Name) + return nil + } _, err := az.EnsureLoadBalancer(ctx, clusterName, service, nodes) return err } @@ -475,7 +499,7 @@ func (az *Cloud) findServiceIPAddress(ctx context.Context, clusterName string, s return service.Spec.LoadBalancerIP, nil } - lbStatus, existsLb, err := az.GetLoadBalancer(ctx, clusterName, service) + _, lbStatus, existsLb, err := az.getServiceLoadBalancer(service, clusterName, nil, false) if err != nil { return "", err } @@ -1283,6 +1307,11 @@ func (az *Cloud) reconcileSecurityGroup(clusterName string, service *v1.Service, return &sg, nil } +func (az *Cloud) shouldUpdateLoadBalancer(clusterName string, service *v1.Service) bool { + _, _, existsLb, _ := az.getServiceLoadBalancer(service, clusterName, nil, false) + return existsLb && service.ObjectMeta.DeletionTimestamp == nil +} + func logSafe(s *string) string { if s == nil { return "(nil)" diff --git a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer_test.go b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer_test.go index 22284569ba0..808b3579010 100644 --- a/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer_test.go +++ b/staging/src/k8s.io/legacy-cloud-providers/azure/azure_loadbalancer_test.go @@ -25,6 +25,7 @@ import ( "strconv" "strings" "testing" + "time" "github.com/Azure/azure-sdk-for-go/services/network/mgmt/2019-06-01/network" "github.com/Azure/go-autorest/autorest/to" @@ -1895,3 +1896,66 @@ func TestEnsurePublicIPExists(t *testing.T) { assert.Equal(t, test.expectedError, err != nil, "TestCase[%d]: %s", i, test.desc) } } + +func TestShouldUpdateLoadBalancer(t *testing.T) { + testCases := []struct { + desc string + lbHasDeletionTimestamp bool + existsLb bool + expectedOutput bool + }{ + { + desc: "should update a load balancer that does not have a deletion timestamp and exists in Azure", + lbHasDeletionTimestamp: false, + existsLb: true, + expectedOutput: true, + }, + { + desc: "should not update a load balancer that is being deleted / already deleted in K8s", + lbHasDeletionTimestamp: true, + existsLb: true, + expectedOutput: false, + }, + { + desc: "should not update a load balancer that does not exist in Azure", + lbHasDeletionTimestamp: false, + existsLb: false, + expectedOutput: false, + }, + { + desc: "should not update a load balancer that has a deletion timestamp and does not exist in Azure", + lbHasDeletionTimestamp: true, + existsLb: false, + expectedOutput: false, + }, + } + + for i, test := range testCases { + az := getTestCloud() + service := getTestService("test1", v1.ProtocolTCP, nil, 80) + if test.lbHasDeletionTimestamp { + service.ObjectMeta.DeletionTimestamp = &metav1.Time{time.Now()} + } + if test.existsLb { + lb := network.LoadBalancer{ + Name: to.StringPtr("lb1"), + LoadBalancerPropertiesFormat: &network.LoadBalancerPropertiesFormat{ + FrontendIPConfigurations: &[]network.FrontendIPConfiguration{ + { + Name: to.StringPtr("atest1"), + FrontendIPConfigurationPropertiesFormat: &network.FrontendIPConfigurationPropertiesFormat{ + PublicIPAddress: &network.PublicIPAddress{ID: to.StringPtr("id1")}, + }, + }, + }, + }, + } + _, err := az.LoadBalancerClient.CreateOrUpdate(context.TODO(), "rg", *lb.Name, lb, "") + if err != nil { + t.Fatalf("TestCase[%d] meets unexpected error: %v", i, err) + } + } + shouldUpdateLoadBalancer := az.shouldUpdateLoadBalancer(testClusterName, &service) + assert.Equal(t, test.expectedOutput, shouldUpdateLoadBalancer, "TestCase[%d]: %s", i, test.desc) + } +}