diff --git a/pkg/controller/endpointslice/BUILD b/pkg/controller/endpointslice/BUILD index d76da270626..9a6cc5bfa9f 100644 --- a/pkg/controller/endpointslice/BUILD +++ b/pkg/controller/endpointslice/BUILD @@ -41,6 +41,7 @@ go_library( "//staging/src/k8s.io/client-go/tools/record:go_default_library", "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", "//staging/src/k8s.io/component-base/metrics/prometheus/ratelimiter:go_default_library", + "//vendor/golang.org/x/time/rate:go_default_library", "//vendor/k8s.io/klog:go_default_library", "//vendor/k8s.io/utils/net:go_default_library", ], diff --git a/pkg/controller/endpointslice/endpointslice_controller.go b/pkg/controller/endpointslice/endpointslice_controller.go index 1f26a165050..75f81ad672b 100644 --- a/pkg/controller/endpointslice/endpointslice_controller.go +++ b/pkg/controller/endpointslice/endpointslice_controller.go @@ -20,6 +20,8 @@ import ( "fmt" "time" + "golang.org/x/time/rate" + v1 "k8s.io/api/core/v1" discovery "k8s.io/api/discovery/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -47,13 +49,24 @@ const ( // maxRetries is the number of times a service will be retried before it is // dropped out of the queue. Any sync error, such as a failure to create or // update an EndpointSlice could trigger a retry. With the current - // rate-limiter in use (5ms*2^(maxRetries-1)) the following numbers - // represent the sequence of delays between successive queuings of a - // service. + // rate-limiter in use (1s*2^(numRetries-1)) the following numbers represent + // the sequence of delays between successive queuings of a service. // - // 5ms, 10ms, 20ms, 40ms, 80ms, 160ms, 320ms, 640ms, 1.3s, 2.6s, 5.1s, - // 10.2s, 20.4s, 41s, 82s + // 1s, 2s, 4s, 8s, 16s, 32s, 64s, 128s, 256s, 512s, 1000s (max) maxRetries = 15 + + // endpointSliceChangeMinSyncDelay indicates the mininum delay before + // queuing a syncService call after an EndpointSlice changes. If + // endpointUpdatesBatchPeriod is greater than this value, it will be used + // instead. This helps batch processing of changes to multiple + // EndpointSlices. + endpointSliceChangeMinSyncDelay = 1 * time.Second + + // defaultSyncBackOff is the default backoff period for syncService calls. + defaultSyncBackOff = 1 * time.Second + // maxSyncBackOff is the max backoff period for syncService calls. + maxSyncBackOff = 100 * time.Second + // controllerName is a unique value used with LabelManagedBy to indicated // the component managing an EndpointSlice. controllerName = "endpointslice-controller.k8s.io" @@ -80,8 +93,19 @@ func NewController(podInformer coreinformers.PodInformer, endpointslicemetrics.RegisterMetrics() c := &Controller{ - client: client, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "endpoint_slice"), + client: client, + // This is similar to the DefaultControllerRateLimiter, just with a + // significantly higher default backoff (1s vs 5ms). This controller + // processes events that can require significant EndpointSlice changes, + // such as an update to a Service or Deployment. A more significant + // rate limit back off here helps ensure that the Controller does not + // overwhelm the API Server. + queue: workqueue.NewNamedRateLimitingQueue(workqueue.NewMaxOfRateLimiter( + workqueue.NewItemExponentialFailureRateLimiter(defaultSyncBackOff, maxSyncBackOff), + // 10 qps, 100 bucket size. This is only for retry speed and its + // only the overall factor (not per item). + &workqueue.BucketRateLimiter{Limiter: rate.NewLimiter(rate.Limit(10), 100)}, + ), "endpoint_slice"), workerLoopPeriod: time.Second, } @@ -409,7 +433,14 @@ func (c *Controller) queueServiceForEndpointSlice(endpointSlice *discovery.Endpo utilruntime.HandleError(fmt.Errorf("Couldn't get key for EndpointSlice %+v: %v", endpointSlice, err)) return } - c.queue.Add(key) + + // queue after the max of endpointSliceChangeMinSyncDelay and + // endpointUpdatesBatchPeriod. + delay := endpointSliceChangeMinSyncDelay + if c.endpointUpdatesBatchPeriod > delay { + delay = c.endpointUpdatesBatchPeriod + } + c.queue.AddAfter(key, delay) } func (c *Controller) addPod(obj interface{}) {