Merge pull request #43231 from csbell/service-race

Automatic merge from submit-queue

[Federation] Fix deletion logic in service controller

This is a regression from 1.5 exposed by cascading deletions. In order to apply updates, the service controller locks access to a cached service and spawns go routines without waiting for them. When updates and deletions arrive in quick succession, previous goroutines remain active and race with the deletion logic. Coupled with this, the service_helper was not re-evaluating the value of the DeletionTimestamp.

Without this patch, federation will sometimes leak resources at destruction time about half the time.

In e2e land, about 4-5 test runs cause service tests to eat up all global fwd-ing rules and in turn, every subsequent ingress test will fail until we manually clean up leaked resources. No possibility to go green in fed e2e until this is merged.
This commit is contained in:
Kubernetes Submit Queue
2017-03-20 00:19:23 -07:00
committed by GitHub
2 changed files with 14 additions and 2 deletions

View File

@@ -131,9 +131,13 @@ func (cc *clusterClientCache) syncService(key, clusterName string, clusterCache
if isDeletion {
// cachedService is not reliable here as
// deleting cache is the last step of federation service deletion
_, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{})
service, err := fedClient.Core().Services(cachedService.lastState.Namespace).Get(cachedService.lastState.Name, metav1.GetOptions{})
// rebuild service if federation service still exists
if err == nil || !errors.IsNotFound(err) {
if err == nil && service.DeletionTimestamp != nil {
glog.V(4).Infof("Skipping sync of service %v in underlying clusters as it has already been marked for deletion", name)
return nil
}
return sc.ensureClusterService(cachedService, clusterName, cachedService.appliedState, clusterCache.clientset)
}
}

View File

@@ -487,6 +487,10 @@ func wantsDNSRecords(service *v1.Service) bool {
// update DNS records and update the service info with DNS entries to federation apiserver.
// the function returns any error caught
func (s *ServiceController) processServiceForCluster(cachedService *cachedService, clusterName string, service *v1.Service, client *kubeclientset.Clientset) error {
if service.DeletionTimestamp != nil {
glog.V(4).Infof("Service has already been marked for deletion %v", service.Name)
return nil
}
glog.V(4).Infof("Process service %s/%s for cluster %s", service.Namespace, service.Name, clusterName)
// Create or Update k8s Service
err := s.ensureClusterService(cachedService, clusterName, service, client)
@@ -508,15 +512,19 @@ func (s *ServiceController) updateFederationService(key string, cachedService *c
}
// handle available clusters one by one
var hasErr bool
hasErr := false
var wg sync.WaitGroup
for clusterName, cache := range s.clusterCache.clientMap {
wg.Add(1)
go func(cache *clusterCache, clusterName string) {
defer wg.Done()
err := s.processServiceForCluster(cachedService, clusterName, desiredService, cache.clientset)
if err != nil {
hasErr = true
}
}(cache, clusterName)
}
wg.Wait()
if hasErr {
// detail error has been dumped inside the loop
return fmt.Errorf("Service %s/%s was not successfully updated to all clusters", desiredService.Namespace, desiredService.Name), retryable