diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 1f385d81fc8..dee6f7207a1 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -23,6 +23,8 @@ import ( "sync" "time" + "reflect" + "github.com/golang/glog" "k8s.io/kubernetes/pkg/api" "k8s.io/kubernetes/pkg/api/errors" @@ -34,7 +36,6 @@ import ( "k8s.io/kubernetes/pkg/fields" "k8s.io/kubernetes/pkg/types" "k8s.io/kubernetes/pkg/util/runtime" - "reflect" ) const ( @@ -198,7 +199,7 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) { // Returns an error if processing the delta failed, along with a boolean // indicator of whether the processing should be retried. func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { - service, ok := delta.Object.(*api.Service) + deltaService, ok := delta.Object.(*api.Service) var namespacedName types.NamespacedName var cachedService *cachedService if !ok { @@ -213,63 +214,66 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { if !ok { return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable } - service = cachedService.lastState - delta.Object = cachedService.lastState - namespacedName = types.NamespacedName{Namespace: service.Namespace, Name: service.Name} + deltaService = cachedService.lastState + delta.Object = deltaService + namespacedName = types.NamespacedName{Namespace: deltaService.Namespace, Name: deltaService.Name} } else { - namespacedName.Namespace = service.Namespace - namespacedName.Name = service.Name + namespacedName.Namespace = deltaService.Namespace + namespacedName.Name = deltaService.Name cachedService = s.cache.getOrCreate(namespacedName.String()) } - glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, service) + glog.V(2).Infof("Got new %s delta for service: %+v", delta.Type, deltaService) // Ensure that no other goroutine will interfere with our processing of the // service. cachedService.mu.Lock() defer cachedService.mu.Unlock() + // Get the most recent state of the service from the API directly rather than + // trusting the body of the delta. This avoids update re-ordering problems. + // TODO: Handle sync delta types differently rather than doing a get on every + // service every time we sync? + service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name) + if err != nil && !errors.IsNotFound(err) { + glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err) + return err, retryable + } else if errors.IsNotFound(err) { + glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName) + s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") + err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(deltaService), s.zone.Region) + if err != nil { + message := "Error deleting load balancer (will retry): " + err.Error() + s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message) + return err, retryable + } + s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") + s.cache.delete(namespacedName.String()) + return nil, notRetryable + } + // Update the cached service (used above for populating synthetic deletes) cachedService.lastState = service - // TODO: Handle added, updated, and sync differently? - switch delta.Type { - case cache.Added: - fallthrough - case cache.Updated: - fallthrough - case cache.Sync: - err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState) - if err != nil { - message := "Error creating load balancer" - if retry { - message += " (will retry): " - } else { - message += " (will not retry): " - } - message += err.Error() - s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message) + err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState) + if err != nil { + message := "Error creating load balancer" + if retry { + message += " (will retry): " + } else { + message += " (will not retry): " + } + message += err.Error() + s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message) - return err, retry - } - // Always update the cache upon success. - // NOTE: Since we update the cached service if and only if we successfully - // processed it, a cached service being nil implies that it hasn't yet - // been successfully processed. - cachedService.appliedState = service - s.cache.set(namespacedName.String(), cachedService) - case cache.Deleted: - s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") - err := s.balancer.EnsureLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) - if err != nil { - message := "Error deleting load balancer (will retry): " + err.Error() - s.eventRecorder.Event(service, api.EventTypeWarning, "DeletingLoadBalancerFailed", message) - return err, retryable - } - s.eventRecorder.Event(service, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") - s.cache.delete(namespacedName.String()) - default: - glog.Errorf("Unexpected delta type: %v", delta.Type) + return err, retry } + // Always update the cache upon success. + // NOTE: Since we update the cached service if and only if we successfully + // processed it, a cached service being nil implies that it hasn't yet + // been successfully processed. + cachedService.appliedState = service + s.cache.set(namespacedName.String(), cachedService) + return nil, notRetryable } @@ -277,7 +281,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { // should be retried. func (s *ServiceController) createLoadBalancerIfNeeded(namespacedName types.NamespacedName, service, appliedState *api.Service) (error, bool) { if appliedState != nil && !s.needsUpdate(appliedState, service) { - glog.Infof("LB already exists and doesn't need update for service %s", namespacedName) + glog.Infof("LB doesn't need update for service %s", namespacedName) return nil, notRetryable }