From 3769435a452c994cf54de89c30c00efd84e1b53f Mon Sep 17 00:00:00 2001 From: Christian Bell Date: Thu, 16 Mar 2017 10:17:45 -0700 Subject: [PATCH] Fix deletion logic in service controller. This is a regression from 1.5 exposed by cascading deltions. In order to apply updates, the service controller locks access to a cached service and spawns go routines without waiting for them. When updates and deletions arrive in quick succession, previous goroutines remain active and race with the deletion logic. Coupled with this, the service_helper was not re-evaluating the value of the DeletionTimestamp. Without this patch, federation will sometimes leak resources at destruction time. --- .../federation-controller/service/service_helper.go | 6 +++++- .../federation-controller/service/servicecontroller.go | 10 +++++++++- 2 files changed, 14 insertions(+), 2 deletions(-) diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 887769bec29..93672290b6c 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -131,9 +131,13 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache if isDeletion { // cachedService is not reliable here as // deleting cache is the last step of federation service deletion - _, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{}) + service, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{}) // rebuild service if federation service still exists if err == nil || !errors.IsNotFound(err) { + if err == nil && service.DeletionTimestamp != nil { + glog.V(4).Infof("Skipping sync of service %v in underlying clusters as it has already been marked for deletion", name) + return nil + } return sc.ensureClusterService(cachedService, clusterName, cachedService.appliedState, clusterCache.clientset) } } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index c765bb76410..78132ee05b0 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -487,6 +487,10 @@ func wantsDNSRecords(service *v1.Service) bool { // update DNS records and update the service info with DNS entries to federation apiserver. // the function returns any error caught func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error { + if service.DeletionTimestamp != nil { + glog.V(4).Infof("Service has already been marked for deletion %v", service.Name) + return nil + } glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) // Create or Update k8s Service err := s.ensureClusterService(cachedService, clusterName, service, client) @@ -508,15 +512,19 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c } // handle available clusters one by one - var hasErr bool + hasErr := false + var wg sync.WaitGroup for clusterName, cache := range s.clusterCache.clientMap { + wg.Add(1) go func(cache *clusterCache, clusterName string) { + defer wg.Done() err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset) if err != nil { hasErr = true } }(cache, clusterName) } + wg.Wait() if hasErr { // detail error has been dumped inside the loop return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable