Merge pull request #56370 from oracle/for/upstream/master/55528

Automatic merge from submit-queue (batch tested with PRs 57868, 58284, 56370, 58400, 58439). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

Refactor service controller to common controller pattern

**What this PR does / why we need it**:

The service controller currently uses a non-standard controller pattern that adds unneeded complexity. This PR moves the service controller to use common tools like the rate limited queue with exponential backoff versus tracking retry backoff in the cachedService object. 

**Which issue this PR fixes** *(optional, in `fixes #<issue number>(, fixes #<issue_number>, ...)` format, will close that issue when PR gets merged)*: fixes #55528

**Special notes for your reviewer**:

This does change the retry backoff from a linear backoff to an exponential backoff. The min and max retry delay stays the same. 

There was only one case where we didn't want to retry, which was when we updated the service status since that handled retries itself. In that case I moved to the common pattern of using `runtime.HandleError`.

**Release note**:

```release-note
NONE
```

/cc @luxas @wlan0 @andrewsykim @thockin 
@prydie 

/sig cluster-lifecycle
/sig networking
/kind cleanup
This commit is contained in:
Kubernetes Submit Queue 2018-01-18 13:11:34 -08:00 committed by GitHub
commit 408a316b1d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 73 additions and 147 deletions

View File

@ -60,11 +60,6 @@ const (
clientRetryCount = 5
clientRetryInterval = 5 * time.Second
retryable = true
notRetryable = false
doNotRetry = time.Duration(0)
// LabelNodeRoleMaster specifies that a node is a master
// It's copied over to kubeadm until it's merged in core: https://github.com/kubernetes/kubernetes/pull/39112
LabelNodeRoleMaster = "node-role.kubernetes.io/master"
@ -77,8 +72,6 @@ const (
type cachedService struct {
// The cached state of the service
state *v1.Service
// Controls error back-off
lastRetryDelay time.Duration
}
type serviceCache struct {
@ -86,6 +79,8 @@ type serviceCache struct {
serviceMap map[string]*cachedService
}
// ServiceController keeps cloud provider service resources
// (like load balancers) in sync with the registry.
type ServiceController struct {
cloud cloudprovider.Interface
knownHosts []*v1.Node
@ -101,7 +96,7 @@ type ServiceController struct {
nodeLister corelisters.NodeLister
nodeListerSynced cache.InformerSynced
// services that need to be synced
workingQueue workqueue.DelayingInterface
queue workqueue.RateLimitingInterface
}
// New returns a new service controller to keep cloud provider service resources
@ -134,7 +129,7 @@ func New(
eventRecorder: recorder,
nodeLister: nodeInformer.Lister(),
nodeListerSynced: nodeInformer.Informer().HasSynced,
workingQueue: workqueue.NewNamedDelayingQueue("service"),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewItemExponentialFailureRateLimiter(minRetryDelay, maxRetryDelay), "service"),
}
serviceInformer.Informer().AddEventHandlerWithResyncPeriod(
@ -167,7 +162,7 @@ func (s *ServiceController) enqueueService(obj interface{}) {
glog.Errorf("Couldn't get key for object %#v: %v", obj, err)
return
}
s.workingQueue.Add(key)
s.queue.Add(key)
}
// Run starts a background goroutine that watches for changes to services that
@ -182,7 +177,7 @@ func (s *ServiceController) enqueueService(obj interface{}) {
// object.
func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
defer runtime.HandleCrash()
defer s.workingQueue.ShutDown()
defer s.queue.ShutDown()
glog.Info("Starting service controller")
defer glog.Info("Shutting down service controller")
@ -203,21 +198,28 @@ func (s *ServiceController) Run(stopCh <-chan struct{}, workers int) {
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
// It enforces that the syncHandler is never invoked concurrently with the same key.
func (s *ServiceController) worker() {
for {
func() {
key, quit := s.workingQueue.Get()
if quit {
return
}
defer s.workingQueue.Done(key)
err := s.syncService(key.(string))
if err != nil {
glog.Errorf("Error syncing service: %v", err)
}
}()
for s.processNextWorkItem() {
}
}
func (s *ServiceController) processNextWorkItem() bool {
key, quit := s.queue.Get()
if quit {
return false
}
defer s.queue.Done(key)
err := s.syncService(key.(string))
if err == nil {
s.queue.Forget(key)
return true
}
runtime.HandleError(fmt.Errorf("error processing service %v (will retry): %v", key, err))
s.queue.AddRateLimited(key)
return true
}
func (s *ServiceController) init() error {
if s.cloud == nil {
return fmt.Errorf("WARNING: no cloud provider provided, services of type LoadBalancer will fail")
@ -235,31 +237,21 @@ func (s *ServiceController) init() error {
// Returns an error if processing the service update failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry in that Duration.
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) (error, time.Duration) {
func (s *ServiceController) processServiceUpdate(cachedService *cachedService, service *v1.Service, key string) error {
if cachedService.state != nil {
if cachedService.state.UID != service.UID {
err, retry := s.processLoadBalancerDelete(cachedService, key)
err := s.processLoadBalancerDelete(cachedService, key)
if err != nil {
return err, retry
return err
}
}
}
// cache the service, we need the info for service deletion
cachedService.state = service
err, retry := s.createLoadBalancerIfNeeded(key, service)
err := s.createLoadBalancerIfNeeded(key, service)
if err != nil {
message := "Error creating load balancer"
var retryToReturn time.Duration
if retry {
message += " (will retry): "
retryToReturn = cachedService.nextRetryDelay()
} else {
message += " (will not retry): "
retryToReturn = doNotRetry
}
message += err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, "CreatingLoadBalancerFailed", message)
return err, retryToReturn
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "CreatingLoadBalancerFailed", "Error creating load balancer (will retry): %v", err)
return err
}
// Always update the cache upon success.
// NOTE: Since we update the cached service if and only if we successfully
@ -267,13 +259,12 @@ func (s *ServiceController) processServiceUpdate(cachedService *cachedService, s
// been successfully processed.
s.cache.set(key, cachedService)
cachedService.resetRetryDelay()
return nil, doNotRetry
return nil
}
// Returns whatever error occurred along with a boolean indicator of whether it
// should be retried.
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) (error, bool) {
func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.Service) error {
// Note: It is safe to just call EnsureLoadBalancer. But, on some clouds that requires a delete & create,
// which may involve service interruption. Also, we would like user-friendly events.
@ -285,13 +276,13 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
if !wantsLoadBalancer(service) {
_, exists, err := s.balancer.GetLoadBalancer(s.clusterName, service)
if err != nil {
return fmt.Errorf("error getting LB for service %s: %v", key, err), retryable
return fmt.Errorf("error getting LB for service %s: %v", key, err)
}
if exists {
glog.Infof("Deleting existing load balancer for service %s that no longer needs a load balancer.", key)
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
if err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service); err != nil {
return err, retryable
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
}
@ -305,7 +296,7 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuringLoadBalancer", "Ensuring load balancer")
newState, err = s.ensureLoadBalancer(service)
if err != nil {
return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err), retryable
return fmt.Errorf("failed to ensure load balancer for service %s: %v", key, err)
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "EnsuredLoadBalancer", "Ensured load balancer")
}
@ -320,13 +311,14 @@ func (s *ServiceController) createLoadBalancerIfNeeded(key string, service *v1.S
service.Status.LoadBalancer = *newState
if err := s.persistUpdate(service); err != nil {
return fmt.Errorf("failed to persist updated status to apiserver, even after retries. Giving up: %v", err), notRetryable
runtime.HandleError(fmt.Errorf("failed to persist service %q updated status to apiserver, even after retries. Giving up: %v", key, err))
return nil
}
} else {
glog.V(2).Infof("Not persisting unchanged LoadBalancerStatus for service %s to registry.", key)
}
return nil, notRetryable
return nil
}
func (s *ServiceController) persistUpdate(service *v1.Service) error {
@ -703,31 +695,12 @@ func loadBalancerIPsAreEqual(oldService, newService *v1.Service) bool {
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
}
// Computes the next retry, using exponential backoff
// mutex must be held.
func (s *cachedService) nextRetryDelay() time.Duration {
s.lastRetryDelay = s.lastRetryDelay * 2
if s.lastRetryDelay < minRetryDelay {
s.lastRetryDelay = minRetryDelay
}
if s.lastRetryDelay > maxRetryDelay {
s.lastRetryDelay = maxRetryDelay
}
return s.lastRetryDelay
}
// Resets the retry exponential backoff. mutex must be held.
func (s *cachedService) resetRetryDelay() {
s.lastRetryDelay = time.Duration(0)
}
// syncService will sync the Service with the given key if it has had its expectations fulfilled,
// meaning it did not expect to see any more of its pods created or deleted. This function is not meant to be
// invoked concurrently with the same key.
func (s *ServiceController) syncService(key string) error {
startTime := time.Now()
var cachedService *cachedService
var retryDelay time.Duration
defer func() {
glog.V(4).Infof("Finished syncing service %q (%v)", key, time.Since(startTime))
}()
@ -742,59 +715,44 @@ func (s *ServiceController) syncService(key string) error {
switch {
case errors.IsNotFound(err):
// service absence in store means watcher caught the deletion, ensure LB info is cleaned
glog.Infof("Service has been deleted %v", key)
err, retryDelay = s.processServiceDeletion(key)
glog.Infof("Service has been deleted %v. Attempting to cleanup load balancer resources", key)
err = s.processServiceDeletion(key)
case err != nil:
glog.Infof("Unable to retrieve service %v from store: %v", key, err)
s.workingQueue.Add(key)
return err
default:
cachedService = s.cache.getOrCreate(key)
err, retryDelay = s.processServiceUpdate(cachedService, service, key)
err = s.processServiceUpdate(cachedService, service, key)
}
if retryDelay != 0 {
// Add the failed service back to the queue so we'll retry it.
glog.Errorf("Failed to process service %v. Retrying in %s: %v", key, retryDelay, err)
go func(obj interface{}, delay time.Duration) {
// put back the service key to working queue, it is possible that more entries of the service
// were added into the queue during the delay, but it does not mess as when handling the retry,
// it always get the last service info from service store
s.workingQueue.AddAfter(obj, delay)
}(key, retryDelay)
} else if err != nil {
runtime.HandleError(fmt.Errorf("failed to process service %v. Not retrying: %v", key, err))
}
return nil
return err
}
// Returns an error if processing the service deletion failed, along with a time.Duration
// indicating whether processing should be retried; zero means no-retry; otherwise
// we should retry after that Duration.
func (s *ServiceController) processServiceDeletion(key string) (error, time.Duration) {
func (s *ServiceController) processServiceDeletion(key string) error {
cachedService, ok := s.cache.get(key)
if !ok {
return fmt.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key), doNotRetry
glog.Errorf("service %s not in cache even though the watcher thought it was. Ignoring the deletion", key)
return nil
}
return s.processLoadBalancerDelete(cachedService, key)
}
func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) (error, time.Duration) {
func (s *ServiceController) processLoadBalancerDelete(cachedService *cachedService, key string) error {
service := cachedService.state
// delete load balancer info only if the service type is LoadBalancer
if !wantsLoadBalancer(service) {
return nil, doNotRetry
return nil
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
err := s.balancer.EnsureLoadBalancerDeleted(s.clusterName, service)
if err != nil {
message := "Error deleting load balancer (will retry): " + err.Error()
s.eventRecorder.Event(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", message)
return err, cachedService.nextRetryDelay()
s.eventRecorder.Eventf(service, v1.EventTypeWarning, "DeletingLoadBalancerFailed", "Error deleting load balancer (will retry): %v", err)
return err
}
s.eventRecorder.Event(service, v1.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
s.cache.delete(key)
cachedService.resetRetryDelay()
return nil, doNotRetry
return nil
}

View File

@ -20,7 +20,6 @@ import (
"fmt"
"reflect"
"testing"
"time"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -129,7 +128,7 @@ func TestCreateExternalLoadBalancer(t *testing.T) {
for _, item := range table {
controller, cloud, client := newController()
err, _ := controller.createLoadBalancerIfNeeded("foo/bar", item.service)
err := controller.createLoadBalancerIfNeeded("foo/bar", item.service)
if !item.expectErr && err != nil {
t.Errorf("unexpected error: %v", err)
} else if item.expectErr && err == nil {
@ -320,7 +319,7 @@ func TestProcessServiceUpdate(t *testing.T) {
key string
updateFn func(*v1.Service) *v1.Service //Manipulate the structure
svc *v1.Service
expectedFn func(*v1.Service, error, time.Duration) error //Error comparision function
expectedFn func(*v1.Service, error) error //Error comparision function
}{
{
testName: "If updating a valid service",
@ -333,15 +332,8 @@ func TestProcessServiceUpdate(t *testing.T) {
return svc
},
expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error {
if err != nil {
return err
}
if retryDuration != doNotRetry {
return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration)
}
return nil
expectedFn: func(svc *v1.Service, err error) error {
return err
},
},
{
@ -358,9 +350,9 @@ func TestProcessServiceUpdate(t *testing.T) {
cachedServiceTest.state = svc
controller.cache.set(keyExpected, cachedServiceTest)
keyGot, quit := controller.workingQueue.Get()
keyGot, quit := controller.queue.Get()
if quit {
t.Fatalf("get no workingQueue element")
t.Fatalf("get no queue element")
}
if keyExpected != keyGot.(string) {
t.Fatalf("get service key error, expected: %s, got: %s", keyExpected, keyGot.(string))
@ -372,20 +364,17 @@ func TestProcessServiceUpdate(t *testing.T) {
return newService
},
expectedFn: func(svc *v1.Service, err error, retryDuration time.Duration) error {
expectedFn: func(svc *v1.Service, err error) error {
if err != nil {
return err
}
if retryDuration != doNotRetry {
return fmt.Errorf("retryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration)
}
keyExpected := svc.GetObjectMeta().GetNamespace() + "/" + svc.GetObjectMeta().GetName()
cachedServiceGot, exist := controller.cache.get(keyExpected)
if !exist {
return fmt.Errorf("update service error, workingQueue should contain service: %s", keyExpected)
return fmt.Errorf("update service error, queue should contain service: %s", keyExpected)
}
if cachedServiceGot.state.Spec.LoadBalancerIP != newLBIP {
return fmt.Errorf("update LoadBalancerIP error, expected: %s, got: %s", newLBIP, cachedServiceGot.state.Spec.LoadBalancerIP)
@ -398,8 +387,8 @@ func TestProcessServiceUpdate(t *testing.T) {
for _, tc := range testCases {
newSvc := tc.updateFn(tc.svc)
svcCache := controller.cache.getOrCreate(tc.key)
obtErr, retryDuration := controller.processServiceUpdate(svcCache, newSvc, tc.key)
if err := tc.expectedFn(newSvc, obtErr, retryDuration); err != nil {
obtErr := controller.processServiceUpdate(svcCache, newSvc, tc.key)
if err := tc.expectedFn(newSvc, obtErr); err != nil {
t.Errorf("%v processServiceUpdate() %v", tc.testName, err)
}
}
@ -491,33 +480,21 @@ func TestProcessServiceDeletion(t *testing.T) {
var controller *ServiceController
var cloud *fakecloud.FakeCloud
//Add a global svcKey name
// Add a global svcKey name
svcKey := "external-balancer"
testCases := []struct {
testName string
updateFn func(*ServiceController) //Update function used to manupulate srv and controller values
expectedFn func(svcErr error, retryDuration time.Duration) error //Function to check if the returned value is expected
updateFn func(*ServiceController) // Update function used to manupulate srv and controller values
expectedFn func(svcErr error) error // Function to check if the returned value is expected
}{
{
testName: "If an non-existant service is deleted",
updateFn: func(controller *ServiceController) {
//Does not do anything
// Does not do anything
},
expectedFn: func(svcErr error, retryDuration time.Duration) error {
expectedError := "service external-balancer not in cache even though the watcher thought it was. Ignoring the deletion"
if svcErr == nil || svcErr.Error() != expectedError {
//cannot be nil or Wrong error message
return fmt.Errorf("Expected=%v Obtained=%v", expectedError, svcErr)
}
if retryDuration != doNotRetry {
//Retry duration should match
return fmt.Errorf("RetryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration)
}
return nil
expectedFn: func(svcErr error) error {
return svcErr
},
},
{
@ -529,7 +506,7 @@ func TestProcessServiceDeletion(t *testing.T) {
cloud.Err = fmt.Errorf("Error Deleting the Loadbalancer")
},
expectedFn: func(svcErr error, retryDuration time.Duration) error {
expectedFn: func(svcErr error) error {
expectedError := "Error Deleting the Loadbalancer"
@ -537,9 +514,6 @@ func TestProcessServiceDeletion(t *testing.T) {
return fmt.Errorf("Expected=%v Obtained=%v", expectedError, svcErr)
}
if retryDuration != minRetryDelay {
return fmt.Errorf("RetryDuration Expected=%v Obtained=%v", minRetryDelay, retryDuration)
}
return nil
},
},
@ -554,21 +528,15 @@ func TestProcessServiceDeletion(t *testing.T) {
controller.cache.set(svcKey, svc)
},
expectedFn: func(svcErr error, retryDuration time.Duration) error {
expectedFn: func(svcErr error) error {
if svcErr != nil {
return fmt.Errorf("Expected=nil Obtained=%v", svcErr)
}
if retryDuration != doNotRetry {
//Retry duration should match
return fmt.Errorf("RetryDuration Expected=%v Obtained=%v", doNotRetry, retryDuration)
}
//It should no longer be in the workqueue.
// It should no longer be in the workqueue.
_, exist := controller.cache.get(svcKey)
if exist {
return fmt.Errorf("delete service error, workingQueue should not contain service: %s any more", svcKey)
return fmt.Errorf("delete service error, queue should not contain service: %s any more", svcKey)
}
return nil
@ -580,8 +548,8 @@ func TestProcessServiceDeletion(t *testing.T) {
//Create a new controller.
controller, cloud, _ = newController()
tc.updateFn(controller)
obtainedErr, retryDuration := controller.processServiceDeletion(svcKey)
if err := tc.expectedFn(obtainedErr, retryDuration); err != nil {
obtainedErr := controller.processServiceDeletion(svcKey)
if err := tc.expectedFn(obtainedErr); err != nil {
t.Errorf("%v processServiceDeletion() %v", tc.testName, err)
}
}