diff --git a/pkg/controller/service/service_controller.go b/pkg/controller/service/service_controller.go index e07d9f44c0c..d912fad9a9a 100644 --- a/pkg/controller/service/service_controller.go +++ b/pkg/controller/service/service_controller.go @@ -60,11 +60,6 @@ const ( clientRetryCount = 5 clientRetryInterval = 5 * time.Second - retryable = true - notRetryable = false - - doNotRetry = time.Duration(0) - // LabelNodeRoleMaster specifies that a node is a master // It's copied over to kubeadm until it's merged in core: https://github.com/kubernetes/kubernetes/pull/39112 LabelNodeRoleMaster = "node-role.kubernetes.io/master" @@ -77,8 +72,6 @@ const ( type cachedService struct { // The cached state of the service state *v1.Service - // Controls error back-off - lastRetryDelay time.Duration } type serviceCache struct { @@ -86,6 +79,8 @@ type serviceCache struct { serviceMap map[string]*cachedService } +// ServiceController keeps cloud provider service resources +// (like load balancers) in sync with the registry. type ServiceController struct { cloud cloudprovider.Interface knownHosts []*v1.Node @@ -101,7 +96,7 @@ type ServiceController struct { nodeLister corelisters.NodeLister nodeListerSynced cache.InformerSynced // services that need to be synced - workingQueue workqueue.DelayingInterface + queue workqueue.RateLimitingInterface } // New returns a new service controller to keep cloud provider service resources @@ -134,7 +129,7 @@ func New( eventRecorder: recorder, nodeLister: nodeInformer.Lister(), nodeListerSynced: nodeInformer.Informer().HasSynced, - workingQueue: workqueue.NewNamedDelayingQueue("service"), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"), } serviceInformer.Informer().AddEventHandlerWithResyncPeriod( @@ -167,7 +162,7 @@ func (s *ServiceController) enqueueService(obj interface{}) { glog.Errorf("Couldn't get key for object %#v: %v", obj, err) return } - s.workingQueue.Add(key) + s.queue.Add(key) } // Run starts a background goroutine that watches for changes to services that @@ -182,7 +177,7 @@ func (s *ServiceController) enqueueService(obj interface{}) { // object. func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) { defer runtime.HandleCrash() - defer s.workingQueue.ShutDown() + defer s.queue.ShutDown() glog.Info("Starting service controller") defer glog.Info("Shutting down service controller") @@ -203,21 +198,28 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) { // worker runs a worker thread that just dequeues items, processes them, and marks them done. // It enforces that the syncHandler is never invoked concurrently with the same key. func (s *ServiceController) worker() { - for { - func() { - key, quit := s.workingQueue.Get() - if quit { - return - } - defer s.workingQueue.Done(key) - err := s.syncService(key.(string)) - if err != nil { - glog.Errorf("Error syncing service: %v", err) - } - }() + for s.processNextWorkItem() { } } +func (s *ServiceController) processNextWorkItem() bool { + key, quit := s.queue.Get() + if quit { + return false + } + defer s.queue.Done(key) + + err := s.syncService(key.(string)) + if err == nil { + s.queue.Forget(key) + return true + } + + runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err)) + s.queue.AddRateLimited(key) + return true +} + func (s *ServiceController) init() error { if s.cloud == nil { return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail") @@ -235,31 +237,21 @@ func (s *ServiceController) init() error { // Returns an error if processing the service update failed, along with a time.Duration // indicating whether processing should be retried; zero means no-retry; otherwise // we should retry in that Duration. -func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) { +func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) error { if cachedService.state != nil { if cachedService.state.UID != service.UID { - err, retry := s.processLoadBalancerDelete(cachedService, key) + err := s.processLoadBalancerDelete(cachedService, key) if err != nil { - return err, retry + return err } } } // cache the service, we need the info for service deletion cachedService.state = service - err, retry := s.createLoadBalancerIfNeeded(key, service) + err := s.createLoadBalancerIfNeeded(key, service) if err != nil { - message := "Error creating load balancer" - var retryToReturn time.Duration - if retry { - message += " (will retry): " - retryToReturn = cachedService.nextRetryDelay() - } else { - message += " (will not retry): " - retryToReturn = doNotRetry - } - message += err.Error() - s.eventRecorder.Event(service, v1.EventTypeWarning, "CreatingLoadBalancerFailed", message) - return err, retryToReturn + s.eventRecorder.Eventf(service, v1.EventTypeWarning, "CreatingLoadBalancerFailed", "Error creating load balancer (will retry): %v", err) + return err } // Always update the cache upon success. // NOTE: Since we update the cached service if and only if we successfully @@ -267,13 +259,12 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s // been successfully processed. s.cache.set(key, cachedService) - cachedService.resetRetryDelay() - return nil, doNotRetry + return nil } // Returns whatever error occurred along with a boolean indicator of whether it // should be retried. -func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) { +func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) error { // Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create, // which may involve service interruption. Also, we would like user-friendly events. @@ -285,13 +276,13 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S if !wantsLoadBalancer(service) { _, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service) if err != nil { - return fmt.Errorf("error getting LB for service %s: %v", key, err), retryable + return fmt.Errorf("error getting LB for service %s: %v", key, err) } if exists { glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key) s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil { - return err, retryable + return err } s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") } @@ -305,7 +296,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer") newState, err = s.ensureLoadBalancer(service) if err != nil { - return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err), retryable + return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err) } s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer") } @@ -320,13 +311,14 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S service.Status.LoadBalancer = *newState if err := s.persistUpdate(service); err != nil { - return fmt.Errorf("failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable + runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err)) + return nil } } else { glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key) } - return nil, notRetryable + return nil } func (s *ServiceController) persistUpdate(service *v1.Service) error { @@ -703,31 +695,12 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool { return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP } -// Computes the next retry, using exponential backoff -// mutex must be held. -func (s *cachedService) nextRetryDelay() time.Duration { - s.lastRetryDelay = s.lastRetryDelay * 2 - if s.lastRetryDelay < minRetryDelay { - s.lastRetryDelay = minRetryDelay - } - if s.lastRetryDelay > maxRetryDelay { - s.lastRetryDelay = maxRetryDelay - } - return s.lastRetryDelay -} - -// Resets the retry exponential backoff. mutex must be held. -func (s *cachedService) resetRetryDelay() { - s.lastRetryDelay = time.Duration(0) -} - // syncService will sync the Service with the given key if it has had its expectations fulfilled, // meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be // invoked concurrently with the same key. func (s *ServiceController) syncService(key string) error { startTime := time.Now() var cachedService *cachedService - var retryDelay time.Duration defer func() { glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime)) }() @@ -742,59 +715,44 @@ func (s *ServiceController) syncService(key string) error { switch { case errors.IsNotFound(err): // service absence in store means watcher caught the deletion, ensure LB info is cleaned - glog.Infof("Service has been deleted %v", key) - err, retryDelay = s.processServiceDeletion(key) + glog.Infof("Service has been deleted %v. Attempting to cleanup load balancer resources", key) + err = s.processServiceDeletion(key) case err != nil: glog.Infof("Unable to retrieve service %v from store: %v", key, err) - s.workingQueue.Add(key) - return err default: cachedService = s.cache.getOrCreate(key) - err, retryDelay = s.processServiceUpdate(cachedService, service, key) + err = s.processServiceUpdate(cachedService, service, key) } - if retryDelay != 0 { - // Add the failed service back to the queue so we'll retry it. - glog.Errorf("Failed to process service %v. Retrying in %s: %v", key, retryDelay, err) - go func(obj interface{}, delay time.Duration) { - // put back the service key to working queue, it is possible that more entries of the service - // were added into the queue during the delay, but it does not mess as when handling the retry, - // it always get the last service info from service store - s.workingQueue.AddAfter(obj, delay) - }(key, retryDelay) - } else if err != nil { - runtime.HandleError(fmt.Errorf("failed to process service %v. Not retrying: %v", key, err)) - } - return nil + return err } // Returns an error if processing the service deletion failed, along with a time.Duration // indicating whether processing should be retried; zero means no-retry; otherwise // we should retry after that Duration. -func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) { +func (s *ServiceController) processServiceDeletion(key string) error { cachedService, ok := s.cache.get(key) if !ok { - return fmt.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key), doNotRetry + glog.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key) + return nil } return s.processLoadBalancerDelete(cachedService, key) } -func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) (error, time.Duration) { +func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) error { service := cachedService.state // delete load balancer info only if the service type is LoadBalancer if !wantsLoadBalancer(service) { - return nil, doNotRetry + return nil } s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer") err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service) if err != nil { - message := "Error deleting load balancer (will retry): " + err.Error() - s.eventRecorder.Event(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", message) - return err, cachedService.nextRetryDelay() + s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", "Error deleting load balancer (will retry): %v", err) + return err } s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer") s.cache.delete(key) - cachedService.resetRetryDelay() - return nil, doNotRetry + return nil } diff --git a/pkg/controller/service/service_controller_test.go b/pkg/controller/service/service_controller_test.go index 0c4990adb1a..0241cb1d22d 100644 --- a/pkg/controller/service/service_controller_test.go +++ b/pkg/controller/service/service_controller_test.go @@ -20,7 +20,6 @@ import ( "fmt" "reflect" "testing" - "time" "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -129,7 +128,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) { for _, item := range table { controller, cloud, client := newController() - err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service) + err := controller.createLoadBalancerIfNeeded("foo/bar", item.service) if !item.expectErr && err != nil { t.Errorf("unexpected error: %v", err) } else if item.expectErr && err == nil { @@ -320,7 +319,7 @@ func TestProcessServiceUpdate(t *testing.T) { key string updateFn func(*v1.Service) *v1.Service //Manipulate the structure svc *v1.Service - expectedFn func(*v1.Service, error, time.Duration) error //Error comparision function + expectedFn func(*v1.Service, error) error //Error comparision function }{ { testName: "If updating a valid service", @@ -333,15 +332,8 @@ func TestProcessServiceUpdate(t *testing.T) { return svc }, - expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error { - - if err != nil { - return err - } - if retryDuration != doNotRetry { - return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) - } - return nil + expectedFn: func(svc *v1.Service, err error) error { + return err }, }, { @@ -358,9 +350,9 @@ func TestProcessServiceUpdate(t *testing.T) { cachedServiceTest.state = svc controller.cache.set(keyExpected, cachedServiceTest) - keyGot, quit := controller.workingQueue.Get() + keyGot, quit := controller.queue.Get() if quit { - t.Fatalf("get no workingQueue element") + t.Fatalf("get no queue element") } if keyExpected != keyGot.(string) { t.Fatalf("get service key error, expected: %s, got: %s", keyExpected, keyGot.(string)) @@ -372,20 +364,17 @@ func TestProcessServiceUpdate(t *testing.T) { return newService }, - expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error { + expectedFn: func(svc *v1.Service, err error) error { if err != nil { return err } - if retryDuration != doNotRetry { - return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) - } keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName() cachedServiceGot, exist := controller.cache.get(keyExpected) if !exist { - return fmt.Errorf("update service error, workingQueue should contain service: %s", keyExpected) + return fmt.Errorf("update service error, queue should contain service: %s", keyExpected) } if cachedServiceGot.state.Spec.LoadBalancerIP != newLBIP { return fmt.Errorf("update LoadBalancerIP error, expected: %s, got: %s", newLBIP, cachedServiceGot.state.Spec.LoadBalancerIP) @@ -398,8 +387,8 @@ func TestProcessServiceUpdate(t *testing.T) { for _, tc := range testCases { newSvc := tc.updateFn(tc.svc) svcCache := controller.cache.getOrCreate(tc.key) - obtErr, retryDuration := controller.processServiceUpdate(svcCache, newSvc, tc.key) - if err := tc.expectedFn(newSvc, obtErr, retryDuration); err != nil { + obtErr := controller.processServiceUpdate(svcCache, newSvc, tc.key) + if err := tc.expectedFn(newSvc, obtErr); err != nil { t.Errorf("%v processServiceUpdate() %v", tc.testName, err) } } @@ -491,33 +480,21 @@ func TestProcessServiceDeletion(t *testing.T) { var controller *ServiceController var cloud *fakecloud.FakeCloud - //Add a global svcKey name + // Add a global svcKey name svcKey := "external-balancer" testCases := []struct { testName string - updateFn func(*ServiceController) //Update function used to manupulate srv and controller values - expectedFn func(svcErr error, retryDuration time.Duration) error //Function to check if the returned value is expected + updateFn func(*ServiceController) // Update function used to manupulate srv and controller values + expectedFn func(svcErr error) error // Function to check if the returned value is expected }{ { testName: "If an non-existant service is deleted", updateFn: func(controller *ServiceController) { - //Does not do anything + // Does not do anything }, - expectedFn: func(svcErr error, retryDuration time.Duration) error { - - expectedError := "service external-balancer not in cache even though the watcher thought it was. Ignoring the deletion" - if svcErr == nil || svcErr.Error() != expectedError { - //cannot be nil or Wrong error message - return fmt.Errorf("Expected=%v Obtained=%v", expectedError, svcErr) - } - - if retryDuration != doNotRetry { - //Retry duration should match - return fmt.Errorf("RetryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) - } - - return nil + expectedFn: func(svcErr error) error { + return svcErr }, }, { @@ -529,7 +506,7 @@ func TestProcessServiceDeletion(t *testing.T) { cloud.Err = fmt.Errorf("Error Deleting the Loadbalancer") }, - expectedFn: func(svcErr error, retryDuration time.Duration) error { + expectedFn: func(svcErr error) error { expectedError := "Error Deleting the Loadbalancer" @@ -537,9 +514,6 @@ func TestProcessServiceDeletion(t *testing.T) { return fmt.Errorf("Expected=%v Obtained=%v", expectedError, svcErr) } - if retryDuration != minRetryDelay { - return fmt.Errorf("RetryDuration Expected=%v Obtained=%v", minRetryDelay, retryDuration) - } return nil }, }, @@ -554,21 +528,15 @@ func TestProcessServiceDeletion(t *testing.T) { controller.cache.set(svcKey, svc) }, - expectedFn: func(svcErr error, retryDuration time.Duration) error { - + expectedFn: func(svcErr error) error { if svcErr != nil { return fmt.Errorf("Expected=nil Obtained=%v", svcErr) } - if retryDuration != doNotRetry { - //Retry duration should match - return fmt.Errorf("RetryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration) - } - - //It should no longer be in the workqueue. + // It should no longer be in the workqueue. _, exist := controller.cache.get(svcKey) if exist { - return fmt.Errorf("delete service error, workingQueue should not contain service: %s any more", svcKey) + return fmt.Errorf("delete service error, queue should not contain service: %s any more", svcKey) } return nil @@ -580,8 +548,8 @@ func TestProcessServiceDeletion(t *testing.T) { //Create a new controller. controller, cloud, _ = newController() tc.updateFn(controller) - obtainedErr, retryDuration := controller.processServiceDeletion(svcKey) - if err := tc.expectedFn(obtainedErr, retryDuration); err != nil { + obtainedErr := controller.processServiceDeletion(svcKey) + if err := tc.expectedFn(obtainedErr); err != nil { t.Errorf("%v processServiceDeletion() %v", tc.testName, err) } }