From 5b3bb56a4f851ce88d52d702e09a550e29802554 Mon Sep 17 00:00:00 2001 From: Justin Santa Barbara Date: Thu, 25 Feb 2016 09:51:11 -0500 Subject: [PATCH] Apply exponential backoff in servicecontroller before retrying Issue #21952 --- pkg/controller/service/servicecontroller.go | 64 +++++++++++++++------ 1 file changed, 48 insertions(+), 16 deletions(-) diff --git a/pkg/controller/service/servicecontroller.go b/pkg/controller/service/servicecontroller.go index 508d4d5ca9d..e8135247bd9 100644 --- a/pkg/controller/service/servicecontroller.go +++ b/pkg/controller/service/servicecontroller.go @@ -44,13 +44,16 @@ const ( // How long to wait before retrying the processing of a service change. // If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster // should be changed appropriately. - processingRetryInterval = 5 * time.Second + minRetryDelay = 5 * time.Second + maxRetryDelay = 300 * time.Second clientRetryCount = 5 clientRetryInterval = 5 * time.Second retryable = true notRetryable = false + + doNotRetry = time.Duration(0) ) type cachedService struct { @@ -61,6 +64,9 @@ type cachedService struct { // Ensures only one goroutine can operate on this service at any given time. mu sync.Mutex + + // Controls error back-off + lastRetryDelay time.Duration } type serviceCache struct { @@ -184,21 +190,26 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) { glog.Errorf("Received nil delta from watcher queue.") continue } - err, shouldRetry := s.processDelta(delta) - if shouldRetry { + err, retryDelay := s.processDelta(delta) + if retryDelay != 0 { // Add the failed service back to the queue so we'll retry it. - glog.Errorf("Failed to process service delta. Retrying: %v", err) - time.Sleep(processingRetryInterval) - serviceQueue.AddIfNotPresent(deltas) + glog.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err) + go func(deltas cache.Deltas, delay time.Duration) { + time.Sleep(delay) + if err := serviceQueue.AddIfNotPresent(deltas); err != nil { + glog.Errorf("Error requeuing service delta - will not retry: %v", err) + } + }(deltas, retryDelay) } else if err != nil { runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err)) } } } -// Returns an error if processing the delta failed, along with a boolean -// indicator of whether the processing should be retried. -func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { +// Returns an error if processing the delta failed, along with a time.Duration +// indicating whether processing should be retried; zero means no-retry; otherwise +// we should retry in that Duration. +func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) { deltaService, ok := delta.Object.(*api.Service) var namespacedName types.NamespacedName var cachedService *cachedService @@ -208,11 +219,11 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { // cache for deleting. key, ok := delta.Object.(cache.DeletedFinalStateUnknown) if !ok { - return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable + return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), doNotRetry } cachedService, ok = s.cache.get(key.Key) if !ok { - return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable + return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry } deltaService = cachedService.lastState delta.Object = deltaService @@ -236,7 +247,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name) if err != nil && !errors.IsNotFound(err) { glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err) - return err, retryable + return err, cachedService.nextRetryDelay() } else if errors.IsNotFound(err) { glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName) s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") @@ -244,11 +255,13 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { if err != nil { message := "Error deleting load balancer (will retry): " + err.Error() s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message) - return err, retryable + return err, cachedService.nextRetryDelay() } s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") s.cache.delete(namespacedName.String()) - return nil, notRetryable + + cachedService.resetRetryDelay() + return nil, doNotRetry } // Update the cached service (used above for populating synthetic deletes) @@ -265,7 +278,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { message += err.Error() s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message) - return err, retry + return err, cachedService.nextRetryDelay() } // Always update the cache upon success. // NOTE: Since we update the cached service if and only if we successfully @@ -274,7 +287,8 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) { cachedService.appliedState = service s.cache.set(namespacedName.String(), cachedService) - return nil, notRetryable + cachedService.resetRetryDelay() + return nil, doNotRetry } // Returns whatever error occurred along with a boolean indicator of whether it @@ -738,3 +752,21 @@ func wantsLoadBalancer(service *api.Service) bool { func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool { return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP } + +// Computes the next retry, using exponential backoff +// mutex must be held. +func (s *cachedService) nextRetryDelay() time.Duration { + s.lastRetryDelay = s.lastRetryDelay * 2 + if s.lastRetryDelay < minRetryDelay { + s.lastRetryDelay = minRetryDelay + } + if s.lastRetryDelay > maxRetryDelay { + s.lastRetryDelay = maxRetryDelay + } + return s.lastRetryDelay +} + +// Resets the retry exponential backoff. mutex must be held. +func (s *cachedService) resetRetryDelay() { + s.lastRetryDelay = time.Duration(0) +}