diff --git a/federation/pkg/federation-controller/service/service_helper.go b/federation/pkg/federation-controller/service/service_helper.go index 887769bec29..93672290b6c 100644 --- a/federation/pkg/federation-controller/service/service_helper.go +++ b/federation/pkg/federation-controller/service/service_helper.go @@ -131,9 +131,13 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache if isDeletion { // cachedService is not reliable here as // deleting cache is the last step of federation service deletion - _, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{}) + service, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{}) // rebuild service if federation service still exists if err == nil || !errors.IsNotFound(err) { + if err == nil && service.DeletionTimestamp != nil { + glog.V(4).Infof("Skipping sync of service %v in underlying clusters as it has already been marked for deletion", name) + return nil + } return sc.ensureClusterService(cachedService, clusterName, cachedService.appliedState, clusterCache.clientset) } } diff --git a/federation/pkg/federation-controller/service/servicecontroller.go b/federation/pkg/federation-controller/service/servicecontroller.go index c765bb76410..78132ee05b0 100644 --- a/federation/pkg/federation-controller/service/servicecontroller.go +++ b/federation/pkg/federation-controller/service/servicecontroller.go @@ -487,6 +487,10 @@ func wantsDNSRecords(service *v1.Service) bool { // update DNS records and update the service info with DNS entries to federation apiserver. // the function returns any error caught func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error { + if service.DeletionTimestamp != nil { + glog.V(4).Infof("Service has already been marked for deletion %v", service.Name) + return nil + } glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName) // Create or Update k8s Service err := s.ensureClusterService(cachedService, clusterName, service, client) @@ -508,15 +512,19 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c } // handle available clusters one by one - var hasErr bool + hasErr := false + var wg sync.WaitGroup for clusterName, cache := range s.clusterCache.clientMap { + wg.Add(1) go func(cache *clusterCache, clusterName string) { + defer wg.Done() err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset) if err != nil { hasErr = true } }(cache, clusterName) } + wg.Wait() if hasErr { // detail error has been dumped inside the loop return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable