Merge pull request #111039 from ncdc/resourcequota-update-filter-func

quota: add an update filter
This commit is contained in:
Kubernetes Prow Robot 2022-07-13 09:27:50 -07:00 committed by GitHub
commit d06fc1e1bb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 56 additions and 21 deletions

View File

@ -426,6 +426,7 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control
IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, IgnoredResourcesFunc: quotaConfiguration.IgnoredResources,
InformersStarted: controllerContext.InformersStarted, InformersStarted: controllerContext.InformersStarted,
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
UpdateFilter: quotainstall.DefaultUpdateFilter(),
} }
if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil { if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil {
if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil {

View File

@ -73,6 +73,8 @@ type ControllerOptions struct {
InformerFactory informerfactory.InformerFactory InformerFactory informerfactory.InformerFactory
// Controls full resync of objects monitored for replenishment. // Controls full resync of objects monitored for replenishment.
ReplenishmentResyncPeriod controller.ResyncPeriodFunc ReplenishmentResyncPeriod controller.ResyncPeriodFunc
// Filters update events so we only enqueue the ones where we know quota will change
UpdateFilter UpdateFilter
} }
// Controller is responsible for tracking quota usage status in the system // Controller is responsible for tracking quota usage status in the system
@ -152,6 +154,7 @@ func NewController(options *ControllerOptions) (*Controller, error) {
resyncPeriod: options.ReplenishmentResyncPeriod, resyncPeriod: options.ReplenishmentResyncPeriod,
replenishmentFunc: rq.replenishQuota, replenishmentFunc: rq.replenishQuota,
registry: rq.registry, registry: rq.registry,
updateFilter: options.UpdateFilter,
} }
rq.quotaMonitor = qm rq.quotaMonitor = qm

View File

@ -23,7 +23,6 @@ import (
"k8s.io/klog/v2" "k8s.io/klog/v2"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
@ -35,8 +34,6 @@ import (
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/controller-manager/pkg/informerfactory" "k8s.io/controller-manager/pkg/informerfactory"
"k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/controller"
"k8s.io/kubernetes/pkg/quota/v1/evaluator/core"
"k8s.io/utils/clock"
) )
type eventType int type eventType int
@ -101,6 +98,8 @@ type QuotaMonitor struct {
// maintains list of evaluators // maintains list of evaluators
registry quota.Registry registry quota.Registry
updateFilter UpdateFilter
} }
// NewMonitor creates a new instance of a QuotaMonitor // NewMonitor creates a new instance of a QuotaMonitor
@ -133,27 +132,13 @@ func (m *monitor) Run() {
type monitors map[schema.GroupVersionResource]*monitor type monitors map[schema.GroupVersionResource]*monitor
// UpdateFilter is a function that returns true if the update event should be added to the resourceChanges queue.
type UpdateFilter func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool
func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) { func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) {
handlers := cache.ResourceEventHandlerFuncs{ handlers := cache.ResourceEventHandlerFuncs{
UpdateFunc: func(oldObj, newObj interface{}) { UpdateFunc: func(oldObj, newObj interface{}) {
// TODO: leaky abstraction! live w/ it for now, but should pass down an update filter func. if qm.updateFilter != nil && qm.updateFilter(resource, oldObj, newObj) {
// we only want to queue the updates we care about though as too much noise will overwhelm queue.
notifyUpdate := false
switch resource.GroupResource() {
case schema.GroupResource{Resource: "pods"}:
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
notifyUpdate = core.QuotaV1Pod(oldPod, clock.RealClock{}) && !core.QuotaV1Pod(newPod, clock.RealClock{})
case schema.GroupResource{Resource: "services"}:
oldService := oldObj.(*v1.Service)
newService := newObj.(*v1.Service)
notifyUpdate = core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService)
case schema.GroupResource{Resource: "persistentvolumeclaims"}:
oldPVC := oldObj.(*v1.PersistentVolumeClaim)
newPVC := newObj.(*v1.PersistentVolumeClaim)
notifyUpdate = core.RequiresQuotaReplenish(newPVC, oldPVC)
}
if notifyUpdate {
event := &event{ event := &event{
eventType: updateEvent, eventType: updateEvent,
obj: newObj, obj: newObj,

View File

@ -0,0 +1,46 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package install
import (
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/kubernetes/pkg/quota/v1/evaluator/core"
"k8s.io/utils/clock"
)
// DefaultUpdateFilter returns the default update filter for resource update events for consideration for quota.
func DefaultUpdateFilter() func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool {
return func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool {
switch resource.GroupResource() {
case schema.GroupResource{Resource: "pods"}:
oldPod := oldObj.(*v1.Pod)
newPod := newObj.(*v1.Pod)
return core.QuotaV1Pod(oldPod, clock.RealClock{}) && !core.QuotaV1Pod(newPod, clock.RealClock{})
case schema.GroupResource{Resource: "services"}:
oldService := oldObj.(*v1.Service)
newService := newObj.(*v1.Service)
return core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService)
case schema.GroupResource{Resource: "persistentvolumeclaims"}:
oldPVC := oldObj.(*v1.PersistentVolumeClaim)
newPVC := newObj.(*v1.PersistentVolumeClaim)
return core.RequiresQuotaReplenish(newPVC, oldPVC)
}
return false
}
}