From a899441484f2edb437f2d08621a32920c5c8f4c5 Mon Sep 17 00:00:00 2001 From: Andy Goldstein Date: Fri, 8 Jul 2022 18:16:27 -0400 Subject: [PATCH] quota: add an update filter Fix a TODO to plumb an update filter from above in the resource quota monitor code that was handling update events for quota-able objects, instead of hard-coding the logic in the resource quota monitor. Signed-off-by: Andy Goldstein --- cmd/kube-controller-manager/app/core.go | 1 + .../resource_quota_controller.go | 3 ++ .../resourcequota/resource_quota_monitor.go | 27 +++-------- pkg/quota/v1/install/update_filter.go | 46 +++++++++++++++++++ 4 files changed, 56 insertions(+), 21 deletions(-) create mode 100644 pkg/quota/v1/install/update_filter.go diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 203b2a2e971..e116288332e 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -426,6 +426,7 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control IgnoredResourcesFunc: quotaConfiguration.IgnoredResources, InformersStarted: controllerContext.InformersStarted, Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), + UpdateFilter: quotainstall.DefaultUpdateFilter(), } if resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter() != nil { if err := ratelimiter.RegisterMetricAndTrackRateLimiterUsage("resource_quota_controller", resourceQuotaControllerClient.CoreV1().RESTClient().GetRateLimiter()); err != nil { diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 8441a3c4032..19e6b74c201 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -73,6 +73,8 @@ type ControllerOptions struct { InformerFactory informerfactory.InformerFactory // Controls full resync of objects monitored for replenishment. ReplenishmentResyncPeriod controller.ResyncPeriodFunc + // Filters update events so we only enqueue the ones where we know quota will change + UpdateFilter UpdateFilter } // Controller is responsible for tracking quota usage status in the system @@ -152,6 +154,7 @@ func NewController(options *ControllerOptions) (*Controller, error) { resyncPeriod: options.ReplenishmentResyncPeriod, replenishmentFunc: rq.replenishQuota, registry: rq.registry, + updateFilter: options.UpdateFilter, } rq.quotaMonitor = qm diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go index e2defa9d009..40b89c5d6b9 100644 --- a/pkg/controller/resourcequota/resource_quota_monitor.go +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -23,7 +23,6 @@ import ( "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/runtime/schema" utilerrors "k8s.io/apimachinery/pkg/util/errors" @@ -35,8 +34,6 @@ import ( "k8s.io/client-go/util/workqueue" "k8s.io/controller-manager/pkg/informerfactory" "k8s.io/kubernetes/pkg/controller" - "k8s.io/kubernetes/pkg/quota/v1/evaluator/core" - "k8s.io/utils/clock" ) type eventType int @@ -101,6 +98,8 @@ type QuotaMonitor struct { // maintains list of evaluators registry quota.Registry + + updateFilter UpdateFilter } // NewMonitor creates a new instance of a QuotaMonitor @@ -133,27 +132,13 @@ func (m *monitor) Run() { type monitors map[schema.GroupVersionResource]*monitor +// UpdateFilter is a function that returns true if the update event should be added to the resourceChanges queue. +type UpdateFilter func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool + func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) { handlers := cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { - // TODO: leaky abstraction! live w/ it for now, but should pass down an update filter func. - // we only want to queue the updates we care about though as too much noise will overwhelm queue. - notifyUpdate := false - switch resource.GroupResource() { - case schema.GroupResource{Resource: "pods"}: - oldPod := oldObj.(*v1.Pod) - newPod := newObj.(*v1.Pod) - notifyUpdate = core.QuotaV1Pod(oldPod, clock.RealClock{}) && !core.QuotaV1Pod(newPod, clock.RealClock{}) - case schema.GroupResource{Resource: "services"}: - oldService := oldObj.(*v1.Service) - newService := newObj.(*v1.Service) - notifyUpdate = core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService) - case schema.GroupResource{Resource: "persistentvolumeclaims"}: - oldPVC := oldObj.(*v1.PersistentVolumeClaim) - newPVC := newObj.(*v1.PersistentVolumeClaim) - notifyUpdate = core.RequiresQuotaReplenish(newPVC, oldPVC) - } - if notifyUpdate { + if qm.updateFilter != nil && qm.updateFilter(resource, oldObj, newObj) { event := &event{ eventType: updateEvent, obj: newObj, diff --git a/pkg/quota/v1/install/update_filter.go b/pkg/quota/v1/install/update_filter.go new file mode 100644 index 00000000000..eca16ec8fc9 --- /dev/null +++ b/pkg/quota/v1/install/update_filter.go @@ -0,0 +1,46 @@ +/* +Copyright 2022 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package install + +import ( + v1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/kubernetes/pkg/quota/v1/evaluator/core" + "k8s.io/utils/clock" +) + +// DefaultUpdateFilter returns the default update filter for resource update events for consideration for quota. +func DefaultUpdateFilter() func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool { + return func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool { + switch resource.GroupResource() { + case schema.GroupResource{Resource: "pods"}: + oldPod := oldObj.(*v1.Pod) + newPod := newObj.(*v1.Pod) + return core.QuotaV1Pod(oldPod, clock.RealClock{}) && !core.QuotaV1Pod(newPod, clock.RealClock{}) + case schema.GroupResource{Resource: "services"}: + oldService := oldObj.(*v1.Service) + newService := newObj.(*v1.Service) + return core.GetQuotaServiceType(oldService) != core.GetQuotaServiceType(newService) + case schema.GroupResource{Resource: "persistentvolumeclaims"}: + oldPVC := oldObj.(*v1.PersistentVolumeClaim) + newPVC := newObj.(*v1.PersistentVolumeClaim) + return core.RequiresQuotaReplenish(newPVC, oldPVC) + } + + return false + } +}