diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index a28dba9dfa6..56b3e3395b5 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -411,6 +411,7 @@ func startPodGCController(ctx context.Context, controllerContext ControllerConte } func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) { + ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "resourcequota-controller")) resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller") resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller") discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources @@ -429,14 +430,14 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control Registry: generic.NewRegistry(quotaConfiguration.Evaluators()), UpdateFilter: quotainstall.DefaultUpdateFilter(), } - resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions) + resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions) if err != nil { return nil, false, err } go resourceQuotaController.Run(ctx, int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs)) // Periodically the quota controller to detect new resource types - go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) + go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second) return nil, true, nil } diff --git a/pkg/controller/resourcequota/resource_quota_controller.go b/pkg/controller/resourcequota/resource_quota_controller.go index 19e6b74c201..7faddb4a483 100644 --- a/pkg/controller/resourcequota/resource_quota_controller.go +++ b/pkg/controller/resourcequota/resource_quota_controller.go @@ -23,8 +23,6 @@ import ( "sync" "time" - "k8s.io/klog/v2" - v1 "k8s.io/api/core/v1" apiequality "k8s.io/apimachinery/pkg/api/equality" "k8s.io/apimachinery/pkg/api/errors" @@ -43,6 +41,7 @@ import ( "k8s.io/client-go/tools/cache" "k8s.io/client-go/util/workqueue" "k8s.io/controller-manager/pkg/informerfactory" + "k8s.io/klog/v2" "k8s.io/kubernetes/pkg/controller" ) @@ -51,7 +50,7 @@ type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error) // ReplenishmentFunc is a signal that a resource changed in specified namespace // that may require quota to be recalculated. -type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string) +type ReplenishmentFunc func(ctx context.Context, groupResource schema.GroupResource, namespace string) // ControllerOptions holds options for creating a quota controller type ControllerOptions struct { @@ -104,7 +103,7 @@ type Controller struct { } // NewController creates a quota controller with specified options -func NewController(options *ControllerOptions) (*Controller, error) { +func NewController(ctx context.Context, options *ControllerOptions) (*Controller, error) { // build the resource quota controller rq := &Controller{ rqClient: options.QuotaClient, @@ -118,9 +117,13 @@ func NewController(options *ControllerOptions) (*Controller, error) { // set the synchronization handler rq.syncHandler = rq.syncResourceQuotaFromKey + logger := klog.FromContext(ctx) + options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod( cache.ResourceEventHandlerFuncs{ - AddFunc: rq.addQuota, + AddFunc: func(obj interface{}) { + rq.addQuota(logger, obj) + }, UpdateFunc: func(old, cur interface{}) { // We are only interested in observing updates to quota.spec to drive updates to quota.status. // We ignore all updates to quota.Status because they are all driven by this controller. @@ -135,12 +138,14 @@ func NewController(options *ControllerOptions) (*Controller, error) { if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) { return } - rq.addQuota(curResourceQuota) + rq.addQuota(logger, curResourceQuota) }, // This will enter the sync loop and no-op, because the controller has been deleted from the store. // Note that deleting a controller immediately after scaling it to 0 will not work. The recommended // way of achieving this is by performing a `stop` operation on the controller. - DeleteFunc: rq.enqueueResourceQuota, + DeleteFunc: func(obj interface{}) { + rq.enqueueResourceQuota(logger, obj) + }, }, rq.resyncPeriod(), ) @@ -167,20 +172,23 @@ func NewController(options *ControllerOptions) (*Controller, error) { return nil, err } - if err = qm.SyncMonitors(resources); err != nil { + if err = qm.SyncMonitors(ctx, resources); err != nil { utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err)) } // only start quota once all informers synced - rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced) + rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, func() bool { + return qm.IsSynced(ctx) + }) } return rq, nil } // enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics -func (rq *Controller) enqueueAll() { - defer klog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage") +func (rq *Controller) enqueueAll(ctx context.Context) { + logger := klog.FromContext(ctx) + defer logger.V(4).Info("Resource quota controller queued all resource quota for full calculation of usage") rqs, err := rq.rqLister.List(labels.Everything()) if err != nil { utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err)) @@ -197,19 +205,19 @@ func (rq *Controller) enqueueAll() { } // obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item. -func (rq *Controller) enqueueResourceQuota(obj interface{}) { +func (rq *Controller) enqueueResourceQuota(logger klog.Logger, obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - klog.Errorf("Couldn't get key for object %+v: %v", obj, err) + logger.Error(err, "Couldn't get key", "object", obj) return } rq.queue.Add(key) } -func (rq *Controller) addQuota(obj interface{}) { +func (rq *Controller) addQuota(logger klog.Logger, obj interface{}) { key, err := controller.KeyFunc(obj) if err != nil { - klog.Errorf("Couldn't get key for object %+v: %v", obj, err) + logger.Error(err, "Couldn't get key", "object", obj) return } @@ -239,29 +247,37 @@ func (rq *Controller) addQuota(obj interface{}) { } // worker runs a worker thread that just dequeues items, processes them, and marks them done. -func (rq *Controller) worker(ctx context.Context, queue workqueue.RateLimitingInterface) func(context.Context) { +func (rq *Controller) worker(queue workqueue.RateLimitingInterface) func(context.Context) { workFunc := func(ctx context.Context) bool { key, quit := queue.Get() if quit { return true } defer queue.Done(key) + rq.workerLock.RLock() defer rq.workerLock.RUnlock() + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "queueKey", key) + ctx = klog.NewContext(ctx, logger) + err := rq.syncHandler(ctx, key.(string)) if err == nil { queue.Forget(key) return false } + utilruntime.HandleError(err) queue.AddRateLimited(key) + return false } return func(ctx context.Context) { for { if quit := workFunc(ctx); quit { - klog.Infof("resource quota controller worker shutting down") + klog.FromContext(ctx).Info("resource quota controller worker shutting down") return } } @@ -274,11 +290,13 @@ func (rq *Controller) Run(ctx context.Context, workers int) { defer rq.queue.ShutDown() defer rq.missingUsageQueue.ShutDown() - klog.Infof("Starting resource quota controller") - defer klog.Infof("Shutting down resource quota controller") + logger := klog.FromContext(ctx) + + logger.Info("Starting resource quota controller") + defer logger.Info("Shutting down resource quota controller") if rq.quotaMonitor != nil { - go rq.quotaMonitor.Run(ctx.Done()) + go rq.quotaMonitor.Run(ctx) } if !cache.WaitForNamedCacheSync("resource quota", ctx.Done(), rq.informerSyncedFuncs...) { @@ -287,14 +305,14 @@ func (rq *Controller) Run(ctx context.Context, workers int) { // the workers that chug through the quota calculation backlog for i := 0; i < workers; i++ { - go wait.UntilWithContext(ctx, rq.worker(ctx, rq.queue), time.Second) - go wait.UntilWithContext(ctx, rq.worker(ctx, rq.missingUsageQueue), time.Second) + go wait.UntilWithContext(ctx, rq.worker(rq.queue), time.Second) + go wait.UntilWithContext(ctx, rq.worker(rq.missingUsageQueue), time.Second) } // the timer for how often we do a full recalculation across all quotas if rq.resyncPeriod() > 0 { - go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), ctx.Done()) + go wait.UntilWithContext(ctx, rq.enqueueAll, rq.resyncPeriod()) } else { - klog.Warningf("periodic quota controller resync disabled") + logger.Info("periodic quota controller resync disabled") } <-ctx.Done() } @@ -302,8 +320,12 @@ func (rq *Controller) Run(ctx context.Context, workers int) { // syncResourceQuotaFromKey syncs a quota key func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) (err error) { startTime := time.Now() + + logger := klog.FromContext(ctx) + logger = klog.LoggerWithValues(logger, "key", key) + defer func() { - klog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Since(startTime)) + logger.V(4).Info("Finished syncing resource quota", "key", key, "duration", time.Since(startTime)) }() namespace, name, err := cache.SplitMetaNamespaceKey(key) @@ -312,11 +334,11 @@ func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) } resourceQuota, err := rq.rqLister.ResourceQuotas(namespace).Get(name) if errors.IsNotFound(err) { - klog.Infof("Resource quota has been deleted %v", key) + logger.Info("Resource quota has been deleted", "key", key) return nil } if err != nil { - klog.Infof("Unable to retrieve resource quota %v from store: %v", key, err) + logger.Error(err, "Unable to retrieve resource quota from store", "key", key) return err } return rq.syncResourceQuota(ctx, resourceQuota) @@ -374,7 +396,7 @@ func (rq *Controller) syncResourceQuota(ctx context.Context, resourceQuota *v1.R } // replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated -func (rq *Controller) replenishQuota(groupResource schema.GroupResource, namespace string) { +func (rq *Controller) replenishQuota(ctx context.Context, groupResource schema.GroupResource, namespace string) { // check if the quota controller can evaluate this groupResource, if not, ignore it altogether... evaluator := rq.registry.Get(groupResource) if evaluator == nil { @@ -395,22 +417,24 @@ func (rq *Controller) replenishQuota(groupResource schema.GroupResource, namespa return } + logger := klog.FromContext(ctx) + // only queue those quotas that are tracking a resource associated with this kind. for i := range resourceQuotas { resourceQuota := resourceQuotas[i] resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard) if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 { // TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota. - rq.enqueueResourceQuota(resourceQuota) + rq.enqueueResourceQuota(logger, resourceQuota) } } } // Sync periodically resyncs the controller when new resources are observed from discovery. -func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) { +func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResourcesFunc, period time.Duration) { // Something has changed, so track the new state and perform a sync. oldResources := make(map[schema.GroupVersionResource]struct{}) - wait.Until(func() { + wait.UntilWithContext(ctx, func(ctx context.Context) { // Get the current resource list from discovery. newResources, err := GetQuotableResources(discoveryFunc) if err != nil { @@ -427,9 +451,11 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du } } + logger := klog.FromContext(ctx) + // Decide whether discovery has reported a change. if reflect.DeepEqual(oldResources, newResources) { - klog.V(4).Infof("no resource updates from discovery, skipping resource quota sync") + logger.V(4).Info("no resource updates from discovery, skipping resource quota sync") return } @@ -439,12 +465,12 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du defer rq.workerLock.Unlock() // Something has changed, so track the new state and perform a sync. - if klogV := klog.V(2); klogV.Enabled() { - klogV.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources)) + if loggerV := logger.V(2); loggerV.Enabled() { + loggerV.Info("syncing resource quota controller with updated resources from discovery", "diff", printDiff(oldResources, newResources)) } // Perform the monitor resync and wait for controllers to report cache sync. - if err := rq.resyncMonitors(newResources); err != nil { + if err := rq.resyncMonitors(ctx, newResources); err != nil { utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err)) return } @@ -452,15 +478,20 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du // this protects us from deadlocks where available resources changed and one of our informer caches will never fill. // informers keep attempting to sync in the background, so retrying doesn't interrupt them. // the call to resyncMonitors on the reattempt will no-op for resources that still exist. - if rq.quotaMonitor != nil && !cache.WaitForNamedCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) { + if rq.quotaMonitor != nil && + !cache.WaitForNamedCacheSync( + "resource quota", + waitForStopOrTimeout(ctx.Done(), period), + func() bool { return rq.quotaMonitor.IsSynced(ctx) }, + ) { utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync")) return } // success, remember newly synced resources oldResources = newResources - klog.V(2).Infof("synced quota controller") - }, period, stopCh) + logger.V(2).Info("synced quota controller") + }, period) } // printDiff returns a human-readable summary of what resources were added and removed @@ -495,15 +526,15 @@ func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan // resyncMonitors starts or stops quota monitors as needed to ensure that all // (and only) those resources present in the map are monitored. -func (rq *Controller) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error { +func (rq *Controller) resyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error { if rq.quotaMonitor == nil { return nil } - if err := rq.quotaMonitor.SyncMonitors(resources); err != nil { + if err := rq.quotaMonitor.SyncMonitors(ctx, resources); err != nil { return err } - rq.quotaMonitor.StartMonitors() + rq.quotaMonitor.StartMonitors(ctx) return nil } diff --git a/pkg/controller/resourcequota/resource_quota_controller_test.go b/pkg/controller/resourcequota/resource_quota_controller_test.go index ee234b4d60f..83b6d8cb3db 100644 --- a/pkg/controller/resourcequota/resource_quota_controller_test.go +++ b/pkg/controller/resourcequota/resource_quota_controller_test.go @@ -41,6 +41,7 @@ import ( "k8s.io/client-go/rest" core "k8s.io/client-go/testing" "k8s.io/client-go/tools/cache" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/pkg/controller" "k8s.io/kubernetes/pkg/quota/v1/install" ) @@ -123,7 +124,8 @@ func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister InformersStarted: alwaysStarted, InformerFactory: informerFactory, } - qc, err := NewController(resourceQuotaControllerOptions) + _, ctx := ktesting.NewTestContext(t) + qc, err := NewController(ctx, resourceQuotaControllerOptions) if err != nil { t.Fatal(err) } @@ -976,7 +978,8 @@ func TestAddQuota(t *testing.T) { } for _, tc := range testCases { - qc.addQuota(tc.quota) + logger, _ := ktesting.NewTestContext(t) + qc.addQuota(logger, tc.quota) if tc.expectedPriority { if e, a := 1, qc.missingUsageQueue.Len(); e != a { t.Errorf("%s: expected %v, got %v", tc.name, e, a) @@ -1075,7 +1078,8 @@ func TestDiscoverySync(t *testing.T) { // The 1s sleep in the test allows GetQuotableResources and // resyncMonitors to run ~5 times to ensure the changes to the // fakeDiscoveryClient are picked up. - go qc.Sync(fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond, stopSync) + _, ctx := ktesting.NewTestContext(t) + go qc.Sync(ctx, fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond) // Wait until the sync discovers the initial resources time.Sleep(1 * time.Second) diff --git a/pkg/controller/resourcequota/resource_quota_monitor.go b/pkg/controller/resourcequota/resource_quota_monitor.go index 40b89c5d6b9..dc46c326cdb 100644 --- a/pkg/controller/resourcequota/resource_quota_monitor.go +++ b/pkg/controller/resourcequota/resource_quota_monitor.go @@ -17,6 +17,7 @@ limitations under the License. package resourcequota import ( + "context" "fmt" "sync" "time" @@ -135,7 +136,9 @@ type monitors map[schema.GroupVersionResource]*monitor // UpdateFilter is a function that returns true if the update event should be added to the resourceChanges queue. type UpdateFilter func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool -func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) { +func (qm *QuotaMonitor) controllerFor(ctx context.Context, resource schema.GroupVersionResource) (cache.Controller, error) { + logger := klog.FromContext(ctx) + handlers := cache.ResourceEventHandlerFuncs{ UpdateFunc: func(oldObj, newObj interface{}) { if qm.updateFilter != nil && qm.updateFilter(resource, oldObj, newObj) { @@ -163,11 +166,11 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac } shared, err := qm.informerFactory.ForResource(resource) if err == nil { - klog.V(4).Infof("QuotaMonitor using a shared informer for resource %q", resource.String()) + logger.V(4).Info("QuotaMonitor using a shared informer", "resource", resource.String()) shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod()) return shared.Informer().GetController(), nil } - klog.V(4).Infof("QuotaMonitor unable to use a shared informer for resource %q: %v", resource.String(), err) + logger.V(4).Error(err, "QuotaMonitor unable to use a shared informer", "resource", resource.String()) // TODO: if we can share storage with garbage collector, it may make sense to support other resources // until that time, aggregated api servers will have to run their own controller to reconcile their own quota. @@ -180,7 +183,9 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac // instead of immediately exiting on an error. It may be called before or after // Run. Monitors are NOT started as part of the sync. To ensure all existing // monitors are started, call StartMonitors. -func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]struct{}) error { +func (qm *QuotaMonitor) SyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error { + logger := klog.FromContext(ctx) + qm.monitorLock.Lock() defer qm.monitorLock.Unlock() @@ -202,7 +207,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s kept++ continue } - c, err := qm.controllerFor(resource) + c, err := qm.controllerFor(ctx, resource) if err != nil { errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err)) continue @@ -215,7 +220,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource) evaluator = generic.NewObjectCountEvaluator(resource.GroupResource(), listResourceFunc, "") qm.registry.Add(evaluator) - klog.Infof("QuotaMonitor created object count evaluator for %s", resource.GroupResource()) + logger.Info("QuotaMonitor created object count evaluator", "resource", resource.GroupResource()) } // track the monitor @@ -230,7 +235,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s } } - klog.V(4).Infof("quota synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove)) + logger.V(4).Info("quota synced monitors", "added", added, "kept", kept, "removed", len(toRemove)) // NewAggregate returns nil if errs is 0-length return utilerrors.NewAggregate(errs) } @@ -240,7 +245,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s // // If called before Run, StartMonitors does nothing (as there is no stop channel // to support monitor/informer execution). -func (qm *QuotaMonitor) StartMonitors() { +func (qm *QuotaMonitor) StartMonitors(ctx context.Context) { qm.monitorLock.Lock() defer qm.monitorLock.Unlock() @@ -262,25 +267,27 @@ func (qm *QuotaMonitor) StartMonitors() { started++ } } - klog.V(4).Infof("QuotaMonitor started %d new monitors, %d currently running", started, len(monitors)) + klog.FromContext(ctx).V(4).Info("QuotaMonitor finished starting monitors", "new", started, "total", len(monitors)) } // IsSynced returns true if any monitors exist AND all those monitors' // controllers HasSynced functions return true. This means IsSynced could return // true at one time, and then later return false if all monitors were // reconstructed. -func (qm *QuotaMonitor) IsSynced() bool { +func (qm *QuotaMonitor) IsSynced(ctx context.Context) bool { + logger := klog.FromContext(ctx) + qm.monitorLock.RLock() defer qm.monitorLock.RUnlock() if len(qm.monitors) == 0 { - klog.V(4).Info("quota monitor not synced: no monitors") + logger.V(4).Info("quota monitor not synced: no monitors") return false } for resource, monitor := range qm.monitors { if !monitor.controller.HasSynced() { - klog.V(4).Infof("quota monitor not synced: %v", resource) + logger.V(4).Info("quota monitor not synced", "resource", resource) return false } } @@ -289,21 +296,23 @@ func (qm *QuotaMonitor) IsSynced() bool { // Run sets the stop channel and starts monitor execution until stopCh is // closed. Any running monitors will be stopped before Run returns. -func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { +func (qm *QuotaMonitor) Run(ctx context.Context) { defer utilruntime.HandleCrash() - klog.Infof("QuotaMonitor running") - defer klog.Infof("QuotaMonitor stopping") + logger := klog.FromContext(ctx) + + logger.Info("QuotaMonitor running") + defer logger.Info("QuotaMonitor stopping") // Set up the stop channel. qm.monitorLock.Lock() - qm.stopCh = stopCh + qm.stopCh = ctx.Done() qm.running = true qm.monitorLock.Unlock() // Start monitors and begin change processing until the stop channel is // closed. - qm.StartMonitors() + qm.StartMonitors(ctx) // The following workers are hanging forever until the queue is // shutted down, so we need to shut it down in a separate goroutine. @@ -311,9 +320,9 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { defer utilruntime.HandleCrash() defer qm.resourceChanges.ShutDown() - <-stopCh + <-ctx.Done() }() - wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh) + wait.UntilWithContext(ctx, qm.runProcessResourceChanges, 1*time.Second) // Stop any running monitors. qm.monitorLock.Lock() @@ -326,16 +335,16 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) { close(monitor.stopCh) } } - klog.Infof("QuotaMonitor stopped %d of %d monitors", stopped, len(monitors)) + logger.Info("QuotaMonitor stopped monitors", "stopped", stopped, "total", len(monitors)) } -func (qm *QuotaMonitor) runProcessResourceChanges() { - for qm.processResourceChanges() { +func (qm *QuotaMonitor) runProcessResourceChanges(ctx context.Context) { + for qm.processResourceChanges(ctx) { } } // Dequeueing an event from resourceChanges to process -func (qm *QuotaMonitor) processResourceChanges() bool { +func (qm *QuotaMonitor) processResourceChanges(ctx context.Context) bool { item, quit := qm.resourceChanges.Get() if quit { return false @@ -352,7 +361,13 @@ func (qm *QuotaMonitor) processResourceChanges() bool { utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err)) return true } - klog.V(4).Infof("QuotaMonitor process object: %s, namespace %s, name %s, uid %s, event type %v", event.gvr.String(), accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType) - qm.replenishmentFunc(event.gvr.GroupResource(), accessor.GetNamespace()) + klog.FromContext(ctx).V(4).Info("QuotaMonitor process object", + "resource", event.gvr.String(), + "namespace", accessor.GetNamespace(), + "name", accessor.GetName(), + "uid", string(accessor.GetUID()), + "eventType", event.eventType, + ) + qm.replenishmentFunc(ctx, event.gvr.GroupResource(), accessor.GetNamespace()) return true } diff --git a/test/integration/quota/quota_test.go b/test/integration/quota/quota_test.go index ee6c94636c8..d7c02dab7ad 100644 --- a/test/integration/quota/quota_test.go +++ b/test/integration/quota/quota_test.go @@ -37,6 +37,7 @@ import ( "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" watchtools "k8s.io/client-go/tools/watch" + "k8s.io/klog/v2/ktesting" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/controller" replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" @@ -75,7 +76,8 @@ func TestQuota(t *testing.T) { ns2 := framework.CreateNamespaceOrDie(clientset, "non-quotaed", t) defer framework.DeleteNamespaceOrDie(clientset, ns2, t) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) @@ -102,14 +104,14 @@ func TestQuota(t *testing.T) { InformersStarted: informersStarted, Registry: generic.NewRegistry(qc.Evaluators()), } - resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions) + resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions) if err != nil { t.Fatalf("unexpected err: %v", err) } go resourceQuotaController.Run(ctx, 2) // Periodically the quota controller to detect new resource types - go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) + go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second) informers.Start(ctx.Done()) close(informersStarted) @@ -304,7 +306,8 @@ plugins: ns := framework.CreateNamespaceOrDie(clientset, "quota", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) @@ -331,14 +334,14 @@ plugins: InformersStarted: informersStarted, Registry: generic.NewRegistry(qc.Evaluators()), } - resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions) + resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions) if err != nil { t.Fatalf("unexpected err: %v", err) } go resourceQuotaController.Run(ctx, 2) // Periodically the quota controller to detect new resource types - go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) + go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second) informers.Start(ctx.Done()) close(informersStarted) @@ -430,7 +433,8 @@ plugins: ns := framework.CreateNamespaceOrDie(clientset, "quota", t) defer framework.DeleteNamespaceOrDie(clientset, ns, t) - ctx, cancel := context.WithCancel(context.Background()) + _, ctx := ktesting.NewTestContext(t) + ctx, cancel := context.WithCancel(ctx) defer cancel() informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc()) @@ -457,14 +461,14 @@ plugins: InformersStarted: informersStarted, Registry: generic.NewRegistry(qc.Evaluators()), } - resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions) + resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions) if err != nil { t.Fatalf("unexpected err: %v", err) } go resourceQuotaController.Run(ctx, 2) // Periodically the quota controller to detect new resource types - go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done()) + go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second) informers.Start(ctx.Done()) close(informersStarted)