From 1561fce81ce0b9bf9239d2249cd447390d5018e2 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Tue, 16 Jun 2015 18:59:03 -0400 Subject: [PATCH] servicecontroller: last state applied to LB vs last state seen We need the last state seen for interpreting the change-stream, separately we need to track the last state we successfully applied to the load balancer. --- .../servicecontroller/servicecontroller.go | 21 ++++++++++++------- .../servicecontroller_test.go | 2 +- 2 files changed, 15 insertions(+), 8 deletions(-) diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller.go b/pkg/cloudprovider/servicecontroller/servicecontroller.go index 3e4d5e9d099..a13c867b2e5 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller.go @@ -52,7 +52,11 @@ const ( ) type cachedService struct { - service *api.Service + // The last-known state of the service + lastState *api.Service + // The state as successfully applied to the load balancer + appliedState *api.Service + // Ensures only one goroutine can operate on this service at any given time. mu sync.Mutex } @@ -191,8 +195,8 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { if !ok { return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable } - service = cachedService.service - delta.Object = cachedService.service + service = cachedService.lastState + delta.Object = cachedService.lastState namespacedName = types.NamespacedName{service.Namespace, service.Name} } else { namespacedName.Namespace = service.Namespace @@ -206,6 +210,9 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { cachedService.mu.Lock() defer cachedService.mu.Unlock() + // Update the cached service (used above for populating synthetic deletes) + cachedService.lastState = service + // TODO: Handle added, updated, and sync differently? switch delta.Type { case cache.Added: @@ -213,7 +220,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { case cache.Updated: fallthrough case cache.Sync: - err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.service) + err, retry := s.createLoadBalancerIfNeeded(namespacedName, service, cachedService.appliedState) if err != nil { s.eventRecorder.Event(service, "creating loadbalancer failed", err.Error()) return err, retry @@ -222,7 +229,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { // NOTE: Since we update the cached service if and only if we successully // processed it, a cached service being nil implies that it hasn't yet // been successfully processed. - cachedService.service = service + cachedService.appliedState = service s.cache.set(namespacedName.String(), cachedService) case cache.Deleted: err := s.balancer.EnsureTCPLoadBalancerDeleted(s.loadBalancerName(service), s.zone.Region) @@ -609,10 +616,10 @@ func (s *ServiceController) updateLoadBalancerHosts(services []*cachedService, h // with by the load balancer reconciler. We can trust the load balancer // reconciler to ensure the service's load balancer is created to target // the correct nodes. - if service.service == nil { + if service.appliedState == nil { return } - if err := s.lockedUpdateLoadBalancerHosts(service.service, hosts); err != nil { + if err := s.lockedUpdateLoadBalancerHosts(service.appliedState, hosts); err != nil { glog.Errorf("External error while updating TCP load balancer: %v.", err) servicesToRetry = append(servicesToRetry, service) } diff --git a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go index d39112219f4..d62b3a47162 100644 --- a/pkg/cloudprovider/servicecontroller/servicecontroller_test.go +++ b/pkg/cloudprovider/servicecontroller/servicecontroller_test.go @@ -206,7 +206,7 @@ func TestUpdateNodesInExternalLoadBalancer(t *testing.T) { var services []*cachedService for _, service := range item.services { - services = append(services, &cachedService{service: service}) + services = append(services, &cachedService{lastState: service, appliedState: service}) } if err := controller.updateLoadBalancerHosts(services, hosts); err != nil { t.Errorf("unexpected error: %v", err)