mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-08-17 23:57:49 +00:00
Apply exponential backoff in servicecontroller before retrying
Issue #21952
This commit is contained in:
parent
fe03c663d9
commit
5b3bb56a4f
@ -44,13 +44,16 @@ const (
|
|||||||
// How long to wait before retrying the processing of a service change.
|
// How long to wait before retrying the processing of a service change.
|
||||||
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
|
// If this changes, the sleep in hack/jenkins/e2e.sh before downing a cluster
|
||||||
// should be changed appropriately.
|
// should be changed appropriately.
|
||||||
processingRetryInterval = 5 * time.Second
|
minRetryDelay = 5 * time.Second
|
||||||
|
maxRetryDelay = 300 * time.Second
|
||||||
|
|
||||||
clientRetryCount = 5
|
clientRetryCount = 5
|
||||||
clientRetryInterval = 5 * time.Second
|
clientRetryInterval = 5 * time.Second
|
||||||
|
|
||||||
retryable = true
|
retryable = true
|
||||||
notRetryable = false
|
notRetryable = false
|
||||||
|
|
||||||
|
doNotRetry = time.Duration(0)
|
||||||
)
|
)
|
||||||
|
|
||||||
type cachedService struct {
|
type cachedService struct {
|
||||||
@ -61,6 +64,9 @@ type cachedService struct {
|
|||||||
|
|
||||||
// Ensures only one goroutine can operate on this service at any given time.
|
// Ensures only one goroutine can operate on this service at any given time.
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|
||||||
|
// Controls error back-off
|
||||||
|
lastRetryDelay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
type serviceCache struct {
|
type serviceCache struct {
|
||||||
@ -184,21 +190,26 @@ func (s *ServiceController) watchServices(serviceQueue *cache.DeltaFIFO) {
|
|||||||
glog.Errorf("Received nil delta from watcher queue.")
|
glog.Errorf("Received nil delta from watcher queue.")
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
err, shouldRetry := s.processDelta(delta)
|
err, retryDelay := s.processDelta(delta)
|
||||||
if shouldRetry {
|
if retryDelay != 0 {
|
||||||
// Add the failed service back to the queue so we'll retry it.
|
// Add the failed service back to the queue so we'll retry it.
|
||||||
glog.Errorf("Failed to process service delta. Retrying: %v", err)
|
glog.Errorf("Failed to process service delta. Retrying in %s: %v", retryDelay, err)
|
||||||
time.Sleep(processingRetryInterval)
|
go func(deltas cache.Deltas, delay time.Duration) {
|
||||||
serviceQueue.AddIfNotPresent(deltas)
|
time.Sleep(delay)
|
||||||
|
if err := serviceQueue.AddIfNotPresent(deltas); err != nil {
|
||||||
|
glog.Errorf("Error requeuing service delta - will not retry: %v", err)
|
||||||
|
}
|
||||||
|
}(deltas, retryDelay)
|
||||||
} else if err != nil {
|
} else if err != nil {
|
||||||
runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
|
runtime.HandleError(fmt.Errorf("Failed to process service delta. Not retrying: %v", err))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns an error if processing the delta failed, along with a boolean
|
// Returns an error if processing the delta failed, along with a time.Duration
|
||||||
// indicator of whether the processing should be retried.
|
// indicating whether processing should be retried; zero means no-retry; otherwise
|
||||||
func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
// we should retry in that Duration.
|
||||||
|
func (s *ServiceController) processDelta(delta *cache.Delta) (error, time.Duration) {
|
||||||
deltaService, ok := delta.Object.(*api.Service)
|
deltaService, ok := delta.Object.(*api.Service)
|
||||||
var namespacedName types.NamespacedName
|
var namespacedName types.NamespacedName
|
||||||
var cachedService *cachedService
|
var cachedService *cachedService
|
||||||
@ -208,11 +219,11 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
// cache for deleting.
|
// cache for deleting.
|
||||||
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
|
key, ok := delta.Object.(cache.DeletedFinalStateUnknown)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), notRetryable
|
return fmt.Errorf("Delta contained object that wasn't a service or a deleted key: %+v", delta), doNotRetry
|
||||||
}
|
}
|
||||||
cachedService, ok = s.cache.get(key.Key)
|
cachedService, ok = s.cache.get(key.Key)
|
||||||
if !ok {
|
if !ok {
|
||||||
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), notRetryable
|
return fmt.Errorf("Service %s not in cache even though the watcher thought it was. Ignoring the deletion.", key), doNotRetry
|
||||||
}
|
}
|
||||||
deltaService = cachedService.lastState
|
deltaService = cachedService.lastState
|
||||||
delta.Object = deltaService
|
delta.Object = deltaService
|
||||||
@ -236,7 +247,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name)
|
service, err := s.kubeClient.Core().Services(namespacedName.Namespace).Get(namespacedName.Name)
|
||||||
if err != nil && !errors.IsNotFound(err) {
|
if err != nil && !errors.IsNotFound(err) {
|
||||||
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
|
glog.Warningf("Failed to get most recent state of service %v from API (will retry): %v", namespacedName, err)
|
||||||
return err, retryable
|
return err, cachedService.nextRetryDelay()
|
||||||
} else if errors.IsNotFound(err) {
|
} else if errors.IsNotFound(err) {
|
||||||
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
|
glog.V(2).Infof("Service %v not found, ensuring load balancer is deleted", namespacedName)
|
||||||
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
s.eventRecorder.Event(service, api.EventTypeNormal, "DeletingLoadBalancer", "Deleting load balancer")
|
||||||
@ -244,11 +255,13 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
message := "Error deleting load balancer (will retry): " + err.Error()
|
message := "Error deleting load balancer (will retry): " + err.Error()
|
||||||
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
|
s.eventRecorder.Event(deltaService, api.EventTypeWarning, "DeletingLoadBalancerFailed", message)
|
||||||
return err, retryable
|
return err, cachedService.nextRetryDelay()
|
||||||
}
|
}
|
||||||
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
s.eventRecorder.Event(deltaService, api.EventTypeNormal, "DeletedLoadBalancer", "Deleted load balancer")
|
||||||
s.cache.delete(namespacedName.String())
|
s.cache.delete(namespacedName.String())
|
||||||
return nil, notRetryable
|
|
||||||
|
cachedService.resetRetryDelay()
|
||||||
|
return nil, doNotRetry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Update the cached service (used above for populating synthetic deletes)
|
// Update the cached service (used above for populating synthetic deletes)
|
||||||
@ -265,7 +278,7 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
message += err.Error()
|
message += err.Error()
|
||||||
s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
|
s.eventRecorder.Event(service, api.EventTypeWarning, "CreatingLoadBalancerFailed", message)
|
||||||
|
|
||||||
return err, retry
|
return err, cachedService.nextRetryDelay()
|
||||||
}
|
}
|
||||||
// Always update the cache upon success.
|
// Always update the cache upon success.
|
||||||
// NOTE: Since we update the cached service if and only if we successfully
|
// NOTE: Since we update the cached service if and only if we successfully
|
||||||
@ -274,7 +287,8 @@ func (s *ServiceController) processDelta(delta *cache.Delta) (error, bool) {
|
|||||||
cachedService.appliedState = service
|
cachedService.appliedState = service
|
||||||
s.cache.set(namespacedName.String(), cachedService)
|
s.cache.set(namespacedName.String(), cachedService)
|
||||||
|
|
||||||
return nil, notRetryable
|
cachedService.resetRetryDelay()
|
||||||
|
return nil, doNotRetry
|
||||||
}
|
}
|
||||||
|
|
||||||
// Returns whatever error occurred along with a boolean indicator of whether it
|
// Returns whatever error occurred along with a boolean indicator of whether it
|
||||||
@ -738,3 +752,21 @@ func wantsLoadBalancer(service *api.Service) bool {
|
|||||||
func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
|
func loadBalancerIPsAreEqual(oldService, newService *api.Service) bool {
|
||||||
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
|
return oldService.Spec.LoadBalancerIP == newService.Spec.LoadBalancerIP
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Computes the next retry, using exponential backoff
|
||||||
|
// mutex must be held.
|
||||||
|
func (s *cachedService) nextRetryDelay() time.Duration {
|
||||||
|
s.lastRetryDelay = s.lastRetryDelay * 2
|
||||||
|
if s.lastRetryDelay < minRetryDelay {
|
||||||
|
s.lastRetryDelay = minRetryDelay
|
||||||
|
}
|
||||||
|
if s.lastRetryDelay > maxRetryDelay {
|
||||||
|
s.lastRetryDelay = maxRetryDelay
|
||||||
|
}
|
||||||
|
return s.lastRetryDelay
|
||||||
|
}
|
||||||
|
|
||||||
|
// Resets the retry exponential backoff. mutex must be held.
|
||||||
|
func (s *cachedService) resetRetryDelay() {
|
||||||
|
s.lastRetryDelay = time.Duration(0)
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user