mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 10:51:29 +00:00
resourcequota: use contexual logging (#113315)
Signed-off-by: Andy Goldstein <andy.goldstein@redhat.com>
This commit is contained in:
parent
2f977fd8c4
commit
71ec5ed81d
@ -411,6 +411,7 @@ func startPodGCController(ctx context.Context, controllerContext ControllerConte
|
|||||||
}
|
}
|
||||||
|
|
||||||
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
func startResourceQuotaController(ctx context.Context, controllerContext ControllerContext) (controller.Interface, bool, error) {
|
||||||
|
ctx = klog.NewContext(ctx, klog.LoggerWithName(klog.FromContext(ctx), "resourcequota-controller"))
|
||||||
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
|
resourceQuotaControllerClient := controllerContext.ClientBuilder.ClientOrDie("resourcequota-controller")
|
||||||
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
|
resourceQuotaControllerDiscoveryClient := controllerContext.ClientBuilder.DiscoveryClientOrDie("resourcequota-controller")
|
||||||
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
|
discoveryFunc := resourceQuotaControllerDiscoveryClient.ServerPreferredNamespacedResources
|
||||||
@ -429,14 +430,14 @@ func startResourceQuotaController(ctx context.Context, controllerContext Control
|
|||||||
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
|
Registry: generic.NewRegistry(quotaConfiguration.Evaluators()),
|
||||||
UpdateFilter: quotainstall.DefaultUpdateFilter(),
|
UpdateFilter: quotainstall.DefaultUpdateFilter(),
|
||||||
}
|
}
|
||||||
resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions)
|
resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, false, err
|
return nil, false, err
|
||||||
}
|
}
|
||||||
go resourceQuotaController.Run(ctx, int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs))
|
go resourceQuotaController.Run(ctx, int(controllerContext.ComponentConfig.ResourceQuotaController.ConcurrentResourceQuotaSyncs))
|
||||||
|
|
||||||
// Periodically the quota controller to detect new resource types
|
// Periodically the quota controller to detect new resource types
|
||||||
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())
|
go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
|
||||||
|
|
||||||
return nil, true, nil
|
return nil, true, nil
|
||||||
}
|
}
|
||||||
|
@ -23,8 +23,6 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"k8s.io/klog/v2"
|
|
||||||
|
|
||||||
v1 "k8s.io/api/core/v1"
|
v1 "k8s.io/api/core/v1"
|
||||||
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
apiequality "k8s.io/apimachinery/pkg/api/equality"
|
||||||
"k8s.io/apimachinery/pkg/api/errors"
|
"k8s.io/apimachinery/pkg/api/errors"
|
||||||
@ -43,6 +41,7 @@ import (
|
|||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
"k8s.io/client-go/util/workqueue"
|
"k8s.io/client-go/util/workqueue"
|
||||||
"k8s.io/controller-manager/pkg/informerfactory"
|
"k8s.io/controller-manager/pkg/informerfactory"
|
||||||
|
"k8s.io/klog/v2"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -51,7 +50,7 @@ type NamespacedResourcesFunc func() ([]*metav1.APIResourceList, error)
|
|||||||
|
|
||||||
// ReplenishmentFunc is a signal that a resource changed in specified namespace
|
// ReplenishmentFunc is a signal that a resource changed in specified namespace
|
||||||
// that may require quota to be recalculated.
|
// that may require quota to be recalculated.
|
||||||
type ReplenishmentFunc func(groupResource schema.GroupResource, namespace string)
|
type ReplenishmentFunc func(ctx context.Context, groupResource schema.GroupResource, namespace string)
|
||||||
|
|
||||||
// ControllerOptions holds options for creating a quota controller
|
// ControllerOptions holds options for creating a quota controller
|
||||||
type ControllerOptions struct {
|
type ControllerOptions struct {
|
||||||
@ -104,7 +103,7 @@ type Controller struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NewController creates a quota controller with specified options
|
// NewController creates a quota controller with specified options
|
||||||
func NewController(options *ControllerOptions) (*Controller, error) {
|
func NewController(ctx context.Context, options *ControllerOptions) (*Controller, error) {
|
||||||
// build the resource quota controller
|
// build the resource quota controller
|
||||||
rq := &Controller{
|
rq := &Controller{
|
||||||
rqClient: options.QuotaClient,
|
rqClient: options.QuotaClient,
|
||||||
@ -118,9 +117,13 @@ func NewController(options *ControllerOptions) (*Controller, error) {
|
|||||||
// set the synchronization handler
|
// set the synchronization handler
|
||||||
rq.syncHandler = rq.syncResourceQuotaFromKey
|
rq.syncHandler = rq.syncResourceQuotaFromKey
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
|
options.ResourceQuotaInformer.Informer().AddEventHandlerWithResyncPeriod(
|
||||||
cache.ResourceEventHandlerFuncs{
|
cache.ResourceEventHandlerFuncs{
|
||||||
AddFunc: rq.addQuota,
|
AddFunc: func(obj interface{}) {
|
||||||
|
rq.addQuota(logger, obj)
|
||||||
|
},
|
||||||
UpdateFunc: func(old, cur interface{}) {
|
UpdateFunc: func(old, cur interface{}) {
|
||||||
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
|
// We are only interested in observing updates to quota.spec to drive updates to quota.status.
|
||||||
// We ignore all updates to quota.Status because they are all driven by this controller.
|
// We ignore all updates to quota.Status because they are all driven by this controller.
|
||||||
@ -135,12 +138,14 @@ func NewController(options *ControllerOptions) (*Controller, error) {
|
|||||||
if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
|
if quota.Equals(oldResourceQuota.Spec.Hard, curResourceQuota.Spec.Hard) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rq.addQuota(curResourceQuota)
|
rq.addQuota(logger, curResourceQuota)
|
||||||
},
|
},
|
||||||
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
|
// This will enter the sync loop and no-op, because the controller has been deleted from the store.
|
||||||
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
|
// Note that deleting a controller immediately after scaling it to 0 will not work. The recommended
|
||||||
// way of achieving this is by performing a `stop` operation on the controller.
|
// way of achieving this is by performing a `stop` operation on the controller.
|
||||||
DeleteFunc: rq.enqueueResourceQuota,
|
DeleteFunc: func(obj interface{}) {
|
||||||
|
rq.enqueueResourceQuota(logger, obj)
|
||||||
|
},
|
||||||
},
|
},
|
||||||
rq.resyncPeriod(),
|
rq.resyncPeriod(),
|
||||||
)
|
)
|
||||||
@ -167,20 +172,23 @@ func NewController(options *ControllerOptions) (*Controller, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err = qm.SyncMonitors(resources); err != nil {
|
if err = qm.SyncMonitors(ctx, resources); err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
|
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
|
||||||
}
|
}
|
||||||
|
|
||||||
// only start quota once all informers synced
|
// only start quota once all informers synced
|
||||||
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
|
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, func() bool {
|
||||||
|
return qm.IsSynced(ctx)
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
return rq, nil
|
return rq, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
// enqueueAll is called at the fullResyncPeriod interval to force a full recalculation of quota usage statistics
|
||||||
func (rq *Controller) enqueueAll() {
|
func (rq *Controller) enqueueAll(ctx context.Context) {
|
||||||
defer klog.V(4).Infof("Resource quota controller queued all resource quota for full calculation of usage")
|
logger := klog.FromContext(ctx)
|
||||||
|
defer logger.V(4).Info("Resource quota controller queued all resource quota for full calculation of usage")
|
||||||
rqs, err := rq.rqLister.List(labels.Everything())
|
rqs, err := rq.rqLister.List(labels.Everything())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
|
utilruntime.HandleError(fmt.Errorf("unable to enqueue all - error listing resource quotas: %v", err))
|
||||||
@ -197,19 +205,19 @@ func (rq *Controller) enqueueAll() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
|
// obj could be an *v1.ResourceQuota, or a DeletionFinalStateUnknown marker item.
|
||||||
func (rq *Controller) enqueueResourceQuota(obj interface{}) {
|
func (rq *Controller) enqueueResourceQuota(logger klog.Logger, obj interface{}) {
|
||||||
key, err := controller.KeyFunc(obj)
|
key, err := controller.KeyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
logger.Error(err, "Couldn't get key", "object", obj)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
rq.queue.Add(key)
|
rq.queue.Add(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (rq *Controller) addQuota(obj interface{}) {
|
func (rq *Controller) addQuota(logger klog.Logger, obj interface{}) {
|
||||||
key, err := controller.KeyFunc(obj)
|
key, err := controller.KeyFunc(obj)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Errorf("Couldn't get key for object %+v: %v", obj, err)
|
logger.Error(err, "Couldn't get key", "object", obj)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -239,29 +247,37 @@ func (rq *Controller) addQuota(obj interface{}) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
// worker runs a worker thread that just dequeues items, processes them, and marks them done.
|
||||||
func (rq *Controller) worker(ctx context.Context, queue workqueue.RateLimitingInterface) func(context.Context) {
|
func (rq *Controller) worker(queue workqueue.RateLimitingInterface) func(context.Context) {
|
||||||
workFunc := func(ctx context.Context) bool {
|
workFunc := func(ctx context.Context) bool {
|
||||||
key, quit := queue.Get()
|
key, quit := queue.Get()
|
||||||
if quit {
|
if quit {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
defer queue.Done(key)
|
defer queue.Done(key)
|
||||||
|
|
||||||
rq.workerLock.RLock()
|
rq.workerLock.RLock()
|
||||||
defer rq.workerLock.RUnlock()
|
defer rq.workerLock.RUnlock()
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
logger = klog.LoggerWithValues(logger, "queueKey", key)
|
||||||
|
ctx = klog.NewContext(ctx, logger)
|
||||||
|
|
||||||
err := rq.syncHandler(ctx, key.(string))
|
err := rq.syncHandler(ctx, key.(string))
|
||||||
if err == nil {
|
if err == nil {
|
||||||
queue.Forget(key)
|
queue.Forget(key)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
utilruntime.HandleError(err)
|
utilruntime.HandleError(err)
|
||||||
queue.AddRateLimited(key)
|
queue.AddRateLimited(key)
|
||||||
|
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
return func(ctx context.Context) {
|
return func(ctx context.Context) {
|
||||||
for {
|
for {
|
||||||
if quit := workFunc(ctx); quit {
|
if quit := workFunc(ctx); quit {
|
||||||
klog.Infof("resource quota controller worker shutting down")
|
klog.FromContext(ctx).Info("resource quota controller worker shutting down")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -274,11 +290,13 @@ func (rq *Controller) Run(ctx context.Context, workers int) {
|
|||||||
defer rq.queue.ShutDown()
|
defer rq.queue.ShutDown()
|
||||||
defer rq.missingUsageQueue.ShutDown()
|
defer rq.missingUsageQueue.ShutDown()
|
||||||
|
|
||||||
klog.Infof("Starting resource quota controller")
|
logger := klog.FromContext(ctx)
|
||||||
defer klog.Infof("Shutting down resource quota controller")
|
|
||||||
|
logger.Info("Starting resource quota controller")
|
||||||
|
defer logger.Info("Shutting down resource quota controller")
|
||||||
|
|
||||||
if rq.quotaMonitor != nil {
|
if rq.quotaMonitor != nil {
|
||||||
go rq.quotaMonitor.Run(ctx.Done())
|
go rq.quotaMonitor.Run(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !cache.WaitForNamedCacheSync("resource quota", ctx.Done(), rq.informerSyncedFuncs...) {
|
if !cache.WaitForNamedCacheSync("resource quota", ctx.Done(), rq.informerSyncedFuncs...) {
|
||||||
@ -287,14 +305,14 @@ func (rq *Controller) Run(ctx context.Context, workers int) {
|
|||||||
|
|
||||||
// the workers that chug through the quota calculation backlog
|
// the workers that chug through the quota calculation backlog
|
||||||
for i := 0; i < workers; i++ {
|
for i := 0; i < workers; i++ {
|
||||||
go wait.UntilWithContext(ctx, rq.worker(ctx, rq.queue), time.Second)
|
go wait.UntilWithContext(ctx, rq.worker(rq.queue), time.Second)
|
||||||
go wait.UntilWithContext(ctx, rq.worker(ctx, rq.missingUsageQueue), time.Second)
|
go wait.UntilWithContext(ctx, rq.worker(rq.missingUsageQueue), time.Second)
|
||||||
}
|
}
|
||||||
// the timer for how often we do a full recalculation across all quotas
|
// the timer for how often we do a full recalculation across all quotas
|
||||||
if rq.resyncPeriod() > 0 {
|
if rq.resyncPeriod() > 0 {
|
||||||
go wait.Until(func() { rq.enqueueAll() }, rq.resyncPeriod(), ctx.Done())
|
go wait.UntilWithContext(ctx, rq.enqueueAll, rq.resyncPeriod())
|
||||||
} else {
|
} else {
|
||||||
klog.Warningf("periodic quota controller resync disabled")
|
logger.Info("periodic quota controller resync disabled")
|
||||||
}
|
}
|
||||||
<-ctx.Done()
|
<-ctx.Done()
|
||||||
}
|
}
|
||||||
@ -302,8 +320,12 @@ func (rq *Controller) Run(ctx context.Context, workers int) {
|
|||||||
// syncResourceQuotaFromKey syncs a quota key
|
// syncResourceQuotaFromKey syncs a quota key
|
||||||
func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) (err error) {
|
func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string) (err error) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
logger = klog.LoggerWithValues(logger, "key", key)
|
||||||
|
|
||||||
defer func() {
|
defer func() {
|
||||||
klog.V(4).Infof("Finished syncing resource quota %q (%v)", key, time.Since(startTime))
|
logger.V(4).Info("Finished syncing resource quota", "key", key, "duration", time.Since(startTime))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
namespace, name, err := cache.SplitMetaNamespaceKey(key)
|
||||||
@ -312,11 +334,11 @@ func (rq *Controller) syncResourceQuotaFromKey(ctx context.Context, key string)
|
|||||||
}
|
}
|
||||||
resourceQuota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
|
resourceQuota, err := rq.rqLister.ResourceQuotas(namespace).Get(name)
|
||||||
if errors.IsNotFound(err) {
|
if errors.IsNotFound(err) {
|
||||||
klog.Infof("Resource quota has been deleted %v", key)
|
logger.Info("Resource quota has been deleted", "key", key)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
klog.Infof("Unable to retrieve resource quota %v from store: %v", key, err)
|
logger.Error(err, "Unable to retrieve resource quota from store", "key", key)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
return rq.syncResourceQuota(ctx, resourceQuota)
|
return rq.syncResourceQuota(ctx, resourceQuota)
|
||||||
@ -374,7 +396,7 @@ func (rq *Controller) syncResourceQuota(ctx context.Context, resourceQuota *v1.R
|
|||||||
}
|
}
|
||||||
|
|
||||||
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
|
// replenishQuota is a replenishment function invoked by a controller to notify that a quota should be recalculated
|
||||||
func (rq *Controller) replenishQuota(groupResource schema.GroupResource, namespace string) {
|
func (rq *Controller) replenishQuota(ctx context.Context, groupResource schema.GroupResource, namespace string) {
|
||||||
// check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
|
// check if the quota controller can evaluate this groupResource, if not, ignore it altogether...
|
||||||
evaluator := rq.registry.Get(groupResource)
|
evaluator := rq.registry.Get(groupResource)
|
||||||
if evaluator == nil {
|
if evaluator == nil {
|
||||||
@ -395,22 +417,24 @@ func (rq *Controller) replenishQuota(groupResource schema.GroupResource, namespa
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
// only queue those quotas that are tracking a resource associated with this kind.
|
// only queue those quotas that are tracking a resource associated with this kind.
|
||||||
for i := range resourceQuotas {
|
for i := range resourceQuotas {
|
||||||
resourceQuota := resourceQuotas[i]
|
resourceQuota := resourceQuotas[i]
|
||||||
resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
|
resourceQuotaResources := quota.ResourceNames(resourceQuota.Status.Hard)
|
||||||
if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
|
if intersection := evaluator.MatchingResources(resourceQuotaResources); len(intersection) > 0 {
|
||||||
// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
|
// TODO: make this support targeted replenishment to a specific kind, right now it does a full recalc on that quota.
|
||||||
rq.enqueueResourceQuota(resourceQuota)
|
rq.enqueueResourceQuota(logger, resourceQuota)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Sync periodically resyncs the controller when new resources are observed from discovery.
|
// Sync periodically resyncs the controller when new resources are observed from discovery.
|
||||||
func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Duration, stopCh <-chan struct{}) {
|
func (rq *Controller) Sync(ctx context.Context, discoveryFunc NamespacedResourcesFunc, period time.Duration) {
|
||||||
// Something has changed, so track the new state and perform a sync.
|
// Something has changed, so track the new state and perform a sync.
|
||||||
oldResources := make(map[schema.GroupVersionResource]struct{})
|
oldResources := make(map[schema.GroupVersionResource]struct{})
|
||||||
wait.Until(func() {
|
wait.UntilWithContext(ctx, func(ctx context.Context) {
|
||||||
// Get the current resource list from discovery.
|
// Get the current resource list from discovery.
|
||||||
newResources, err := GetQuotableResources(discoveryFunc)
|
newResources, err := GetQuotableResources(discoveryFunc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -427,9 +451,11 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
// Decide whether discovery has reported a change.
|
// Decide whether discovery has reported a change.
|
||||||
if reflect.DeepEqual(oldResources, newResources) {
|
if reflect.DeepEqual(oldResources, newResources) {
|
||||||
klog.V(4).Infof("no resource updates from discovery, skipping resource quota sync")
|
logger.V(4).Info("no resource updates from discovery, skipping resource quota sync")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -439,12 +465,12 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du
|
|||||||
defer rq.workerLock.Unlock()
|
defer rq.workerLock.Unlock()
|
||||||
|
|
||||||
// Something has changed, so track the new state and perform a sync.
|
// Something has changed, so track the new state and perform a sync.
|
||||||
if klogV := klog.V(2); klogV.Enabled() {
|
if loggerV := logger.V(2); loggerV.Enabled() {
|
||||||
klogV.Infof("syncing resource quota controller with updated resources from discovery: %s", printDiff(oldResources, newResources))
|
loggerV.Info("syncing resource quota controller with updated resources from discovery", "diff", printDiff(oldResources, newResources))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Perform the monitor resync and wait for controllers to report cache sync.
|
// Perform the monitor resync and wait for controllers to report cache sync.
|
||||||
if err := rq.resyncMonitors(newResources); err != nil {
|
if err := rq.resyncMonitors(ctx, newResources); err != nil {
|
||||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -452,15 +478,20 @@ func (rq *Controller) Sync(discoveryFunc NamespacedResourcesFunc, period time.Du
|
|||||||
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
|
// this protects us from deadlocks where available resources changed and one of our informer caches will never fill.
|
||||||
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
|
// informers keep attempting to sync in the background, so retrying doesn't interrupt them.
|
||||||
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
|
// the call to resyncMonitors on the reattempt will no-op for resources that still exist.
|
||||||
if rq.quotaMonitor != nil && !cache.WaitForNamedCacheSync("resource quota", waitForStopOrTimeout(stopCh, period), rq.quotaMonitor.IsSynced) {
|
if rq.quotaMonitor != nil &&
|
||||||
|
!cache.WaitForNamedCacheSync(
|
||||||
|
"resource quota",
|
||||||
|
waitForStopOrTimeout(ctx.Done(), period),
|
||||||
|
func() bool { return rq.quotaMonitor.IsSynced(ctx) },
|
||||||
|
) {
|
||||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
|
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// success, remember newly synced resources
|
// success, remember newly synced resources
|
||||||
oldResources = newResources
|
oldResources = newResources
|
||||||
klog.V(2).Infof("synced quota controller")
|
logger.V(2).Info("synced quota controller")
|
||||||
}, period, stopCh)
|
}, period)
|
||||||
}
|
}
|
||||||
|
|
||||||
// printDiff returns a human-readable summary of what resources were added and removed
|
// printDiff returns a human-readable summary of what resources were added and removed
|
||||||
@ -495,15 +526,15 @@ func waitForStopOrTimeout(stopCh <-chan struct{}, timeout time.Duration) <-chan
|
|||||||
|
|
||||||
// resyncMonitors starts or stops quota monitors as needed to ensure that all
|
// resyncMonitors starts or stops quota monitors as needed to ensure that all
|
||||||
// (and only) those resources present in the map are monitored.
|
// (and only) those resources present in the map are monitored.
|
||||||
func (rq *Controller) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
func (rq *Controller) resyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error {
|
||||||
if rq.quotaMonitor == nil {
|
if rq.quotaMonitor == nil {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
|
if err := rq.quotaMonitor.SyncMonitors(ctx, resources); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
rq.quotaMonitor.StartMonitors()
|
rq.quotaMonitor.StartMonitors(ctx)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -41,6 +41,7 @@ import (
|
|||||||
"k8s.io/client-go/rest"
|
"k8s.io/client-go/rest"
|
||||||
core "k8s.io/client-go/testing"
|
core "k8s.io/client-go/testing"
|
||||||
"k8s.io/client-go/tools/cache"
|
"k8s.io/client-go/tools/cache"
|
||||||
|
"k8s.io/klog/v2/ktesting"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
"k8s.io/kubernetes/pkg/quota/v1/install"
|
"k8s.io/kubernetes/pkg/quota/v1/install"
|
||||||
)
|
)
|
||||||
@ -123,7 +124,8 @@ func setupQuotaController(t *testing.T, kubeClient kubernetes.Interface, lister
|
|||||||
InformersStarted: alwaysStarted,
|
InformersStarted: alwaysStarted,
|
||||||
InformerFactory: informerFactory,
|
InformerFactory: informerFactory,
|
||||||
}
|
}
|
||||||
qc, err := NewController(resourceQuotaControllerOptions)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
qc, err := NewController(ctx, resourceQuotaControllerOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
@ -976,7 +978,8 @@ func TestAddQuota(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
for _, tc := range testCases {
|
for _, tc := range testCases {
|
||||||
qc.addQuota(tc.quota)
|
logger, _ := ktesting.NewTestContext(t)
|
||||||
|
qc.addQuota(logger, tc.quota)
|
||||||
if tc.expectedPriority {
|
if tc.expectedPriority {
|
||||||
if e, a := 1, qc.missingUsageQueue.Len(); e != a {
|
if e, a := 1, qc.missingUsageQueue.Len(); e != a {
|
||||||
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
t.Errorf("%s: expected %v, got %v", tc.name, e, a)
|
||||||
@ -1075,7 +1078,8 @@ func TestDiscoverySync(t *testing.T) {
|
|||||||
// The 1s sleep in the test allows GetQuotableResources and
|
// The 1s sleep in the test allows GetQuotableResources and
|
||||||
// resyncMonitors to run ~5 times to ensure the changes to the
|
// resyncMonitors to run ~5 times to ensure the changes to the
|
||||||
// fakeDiscoveryClient are picked up.
|
// fakeDiscoveryClient are picked up.
|
||||||
go qc.Sync(fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond, stopSync)
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
go qc.Sync(ctx, fakeDiscoveryClient.ServerPreferredNamespacedResources, 200*time.Millisecond)
|
||||||
|
|
||||||
// Wait until the sync discovers the initial resources
|
// Wait until the sync discovers the initial resources
|
||||||
time.Sleep(1 * time.Second)
|
time.Sleep(1 * time.Second)
|
||||||
|
@ -17,6 +17,7 @@ limitations under the License.
|
|||||||
package resourcequota
|
package resourcequota
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
@ -135,7 +136,9 @@ type monitors map[schema.GroupVersionResource]*monitor
|
|||||||
// UpdateFilter is a function that returns true if the update event should be added to the resourceChanges queue.
|
// UpdateFilter is a function that returns true if the update event should be added to the resourceChanges queue.
|
||||||
type UpdateFilter func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool
|
type UpdateFilter func(resource schema.GroupVersionResource, oldObj, newObj interface{}) bool
|
||||||
|
|
||||||
func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cache.Controller, error) {
|
func (qm *QuotaMonitor) controllerFor(ctx context.Context, resource schema.GroupVersionResource) (cache.Controller, error) {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
handlers := cache.ResourceEventHandlerFuncs{
|
handlers := cache.ResourceEventHandlerFuncs{
|
||||||
UpdateFunc: func(oldObj, newObj interface{}) {
|
UpdateFunc: func(oldObj, newObj interface{}) {
|
||||||
if qm.updateFilter != nil && qm.updateFilter(resource, oldObj, newObj) {
|
if qm.updateFilter != nil && qm.updateFilter(resource, oldObj, newObj) {
|
||||||
@ -163,11 +166,11 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac
|
|||||||
}
|
}
|
||||||
shared, err := qm.informerFactory.ForResource(resource)
|
shared, err := qm.informerFactory.ForResource(resource)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
klog.V(4).Infof("QuotaMonitor using a shared informer for resource %q", resource.String())
|
logger.V(4).Info("QuotaMonitor using a shared informer", "resource", resource.String())
|
||||||
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod())
|
shared.Informer().AddEventHandlerWithResyncPeriod(handlers, qm.resyncPeriod())
|
||||||
return shared.Informer().GetController(), nil
|
return shared.Informer().GetController(), nil
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("QuotaMonitor unable to use a shared informer for resource %q: %v", resource.String(), err)
|
logger.V(4).Error(err, "QuotaMonitor unable to use a shared informer", "resource", resource.String())
|
||||||
|
|
||||||
// TODO: if we can share storage with garbage collector, it may make sense to support other resources
|
// TODO: if we can share storage with garbage collector, it may make sense to support other resources
|
||||||
// until that time, aggregated api servers will have to run their own controller to reconcile their own quota.
|
// until that time, aggregated api servers will have to run their own controller to reconcile their own quota.
|
||||||
@ -180,7 +183,9 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac
|
|||||||
// instead of immediately exiting on an error. It may be called before or after
|
// instead of immediately exiting on an error. It may be called before or after
|
||||||
// Run. Monitors are NOT started as part of the sync. To ensure all existing
|
// Run. Monitors are NOT started as part of the sync. To ensure all existing
|
||||||
// monitors are started, call StartMonitors.
|
// monitors are started, call StartMonitors.
|
||||||
func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
func (qm *QuotaMonitor) SyncMonitors(ctx context.Context, resources map[schema.GroupVersionResource]struct{}) error {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
qm.monitorLock.Lock()
|
qm.monitorLock.Lock()
|
||||||
defer qm.monitorLock.Unlock()
|
defer qm.monitorLock.Unlock()
|
||||||
|
|
||||||
@ -202,7 +207,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s
|
|||||||
kept++
|
kept++
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c, err := qm.controllerFor(resource)
|
c, err := qm.controllerFor(ctx, resource)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
|
errs = append(errs, fmt.Errorf("couldn't start monitor for resource %q: %v", resource, err))
|
||||||
continue
|
continue
|
||||||
@ -215,7 +220,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s
|
|||||||
listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource)
|
listResourceFunc := generic.ListResourceUsingListerFunc(listerFunc, resource)
|
||||||
evaluator = generic.NewObjectCountEvaluator(resource.GroupResource(), listResourceFunc, "")
|
evaluator = generic.NewObjectCountEvaluator(resource.GroupResource(), listResourceFunc, "")
|
||||||
qm.registry.Add(evaluator)
|
qm.registry.Add(evaluator)
|
||||||
klog.Infof("QuotaMonitor created object count evaluator for %s", resource.GroupResource())
|
logger.Info("QuotaMonitor created object count evaluator", "resource", resource.GroupResource())
|
||||||
}
|
}
|
||||||
|
|
||||||
// track the monitor
|
// track the monitor
|
||||||
@ -230,7 +235,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
klog.V(4).Infof("quota synced monitors; added %d, kept %d, removed %d", added, kept, len(toRemove))
|
logger.V(4).Info("quota synced monitors", "added", added, "kept", kept, "removed", len(toRemove))
|
||||||
// NewAggregate returns nil if errs is 0-length
|
// NewAggregate returns nil if errs is 0-length
|
||||||
return utilerrors.NewAggregate(errs)
|
return utilerrors.NewAggregate(errs)
|
||||||
}
|
}
|
||||||
@ -240,7 +245,7 @@ func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]s
|
|||||||
//
|
//
|
||||||
// If called before Run, StartMonitors does nothing (as there is no stop channel
|
// If called before Run, StartMonitors does nothing (as there is no stop channel
|
||||||
// to support monitor/informer execution).
|
// to support monitor/informer execution).
|
||||||
func (qm *QuotaMonitor) StartMonitors() {
|
func (qm *QuotaMonitor) StartMonitors(ctx context.Context) {
|
||||||
qm.monitorLock.Lock()
|
qm.monitorLock.Lock()
|
||||||
defer qm.monitorLock.Unlock()
|
defer qm.monitorLock.Unlock()
|
||||||
|
|
||||||
@ -262,25 +267,27 @@ func (qm *QuotaMonitor) StartMonitors() {
|
|||||||
started++
|
started++
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("QuotaMonitor started %d new monitors, %d currently running", started, len(monitors))
|
klog.FromContext(ctx).V(4).Info("QuotaMonitor finished starting monitors", "new", started, "total", len(monitors))
|
||||||
}
|
}
|
||||||
|
|
||||||
// IsSynced returns true if any monitors exist AND all those monitors'
|
// IsSynced returns true if any monitors exist AND all those monitors'
|
||||||
// controllers HasSynced functions return true. This means IsSynced could return
|
// controllers HasSynced functions return true. This means IsSynced could return
|
||||||
// true at one time, and then later return false if all monitors were
|
// true at one time, and then later return false if all monitors were
|
||||||
// reconstructed.
|
// reconstructed.
|
||||||
func (qm *QuotaMonitor) IsSynced() bool {
|
func (qm *QuotaMonitor) IsSynced(ctx context.Context) bool {
|
||||||
|
logger := klog.FromContext(ctx)
|
||||||
|
|
||||||
qm.monitorLock.RLock()
|
qm.monitorLock.RLock()
|
||||||
defer qm.monitorLock.RUnlock()
|
defer qm.monitorLock.RUnlock()
|
||||||
|
|
||||||
if len(qm.monitors) == 0 {
|
if len(qm.monitors) == 0 {
|
||||||
klog.V(4).Info("quota monitor not synced: no monitors")
|
logger.V(4).Info("quota monitor not synced: no monitors")
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
for resource, monitor := range qm.monitors {
|
for resource, monitor := range qm.monitors {
|
||||||
if !monitor.controller.HasSynced() {
|
if !monitor.controller.HasSynced() {
|
||||||
klog.V(4).Infof("quota monitor not synced: %v", resource)
|
logger.V(4).Info("quota monitor not synced", "resource", resource)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -289,21 +296,23 @@ func (qm *QuotaMonitor) IsSynced() bool {
|
|||||||
|
|
||||||
// Run sets the stop channel and starts monitor execution until stopCh is
|
// Run sets the stop channel and starts monitor execution until stopCh is
|
||||||
// closed. Any running monitors will be stopped before Run returns.
|
// closed. Any running monitors will be stopped before Run returns.
|
||||||
func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
func (qm *QuotaMonitor) Run(ctx context.Context) {
|
||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
|
|
||||||
klog.Infof("QuotaMonitor running")
|
logger := klog.FromContext(ctx)
|
||||||
defer klog.Infof("QuotaMonitor stopping")
|
|
||||||
|
logger.Info("QuotaMonitor running")
|
||||||
|
defer logger.Info("QuotaMonitor stopping")
|
||||||
|
|
||||||
// Set up the stop channel.
|
// Set up the stop channel.
|
||||||
qm.monitorLock.Lock()
|
qm.monitorLock.Lock()
|
||||||
qm.stopCh = stopCh
|
qm.stopCh = ctx.Done()
|
||||||
qm.running = true
|
qm.running = true
|
||||||
qm.monitorLock.Unlock()
|
qm.monitorLock.Unlock()
|
||||||
|
|
||||||
// Start monitors and begin change processing until the stop channel is
|
// Start monitors and begin change processing until the stop channel is
|
||||||
// closed.
|
// closed.
|
||||||
qm.StartMonitors()
|
qm.StartMonitors(ctx)
|
||||||
|
|
||||||
// The following workers are hanging forever until the queue is
|
// The following workers are hanging forever until the queue is
|
||||||
// shutted down, so we need to shut it down in a separate goroutine.
|
// shutted down, so we need to shut it down in a separate goroutine.
|
||||||
@ -311,9 +320,9 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
|||||||
defer utilruntime.HandleCrash()
|
defer utilruntime.HandleCrash()
|
||||||
defer qm.resourceChanges.ShutDown()
|
defer qm.resourceChanges.ShutDown()
|
||||||
|
|
||||||
<-stopCh
|
<-ctx.Done()
|
||||||
}()
|
}()
|
||||||
wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)
|
wait.UntilWithContext(ctx, qm.runProcessResourceChanges, 1*time.Second)
|
||||||
|
|
||||||
// Stop any running monitors.
|
// Stop any running monitors.
|
||||||
qm.monitorLock.Lock()
|
qm.monitorLock.Lock()
|
||||||
@ -326,16 +335,16 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
|||||||
close(monitor.stopCh)
|
close(monitor.stopCh)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
klog.Infof("QuotaMonitor stopped %d of %d monitors", stopped, len(monitors))
|
logger.Info("QuotaMonitor stopped monitors", "stopped", stopped, "total", len(monitors))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qm *QuotaMonitor) runProcessResourceChanges() {
|
func (qm *QuotaMonitor) runProcessResourceChanges(ctx context.Context) {
|
||||||
for qm.processResourceChanges() {
|
for qm.processResourceChanges(ctx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Dequeueing an event from resourceChanges to process
|
// Dequeueing an event from resourceChanges to process
|
||||||
func (qm *QuotaMonitor) processResourceChanges() bool {
|
func (qm *QuotaMonitor) processResourceChanges(ctx context.Context) bool {
|
||||||
item, quit := qm.resourceChanges.Get()
|
item, quit := qm.resourceChanges.Get()
|
||||||
if quit {
|
if quit {
|
||||||
return false
|
return false
|
||||||
@ -352,7 +361,13 @@ func (qm *QuotaMonitor) processResourceChanges() bool {
|
|||||||
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
utilruntime.HandleError(fmt.Errorf("cannot access obj: %v", err))
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
klog.V(4).Infof("QuotaMonitor process object: %s, namespace %s, name %s, uid %s, event type %v", event.gvr.String(), accessor.GetNamespace(), accessor.GetName(), string(accessor.GetUID()), event.eventType)
|
klog.FromContext(ctx).V(4).Info("QuotaMonitor process object",
|
||||||
qm.replenishmentFunc(event.gvr.GroupResource(), accessor.GetNamespace())
|
"resource", event.gvr.String(),
|
||||||
|
"namespace", accessor.GetNamespace(),
|
||||||
|
"name", accessor.GetName(),
|
||||||
|
"uid", string(accessor.GetUID()),
|
||||||
|
"eventType", event.eventType,
|
||||||
|
)
|
||||||
|
qm.replenishmentFunc(ctx, event.gvr.GroupResource(), accessor.GetNamespace())
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
@ -37,6 +37,7 @@ import (
|
|||||||
"k8s.io/client-go/informers"
|
"k8s.io/client-go/informers"
|
||||||
clientset "k8s.io/client-go/kubernetes"
|
clientset "k8s.io/client-go/kubernetes"
|
||||||
watchtools "k8s.io/client-go/tools/watch"
|
watchtools "k8s.io/client-go/tools/watch"
|
||||||
|
"k8s.io/klog/v2/ktesting"
|
||||||
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
"k8s.io/kubernetes/cmd/kube-apiserver/app/options"
|
||||||
"k8s.io/kubernetes/pkg/controller"
|
"k8s.io/kubernetes/pkg/controller"
|
||||||
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
replicationcontroller "k8s.io/kubernetes/pkg/controller/replication"
|
||||||
@ -75,7 +76,8 @@ func TestQuota(t *testing.T) {
|
|||||||
ns2 := framework.CreateNamespaceOrDie(clientset, "non-quotaed", t)
|
ns2 := framework.CreateNamespaceOrDie(clientset, "non-quotaed", t)
|
||||||
defer framework.DeleteNamespaceOrDie(clientset, ns2, t)
|
defer framework.DeleteNamespaceOrDie(clientset, ns2, t)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||||
@ -102,14 +104,14 @@ func TestQuota(t *testing.T) {
|
|||||||
InformersStarted: informersStarted,
|
InformersStarted: informersStarted,
|
||||||
Registry: generic.NewRegistry(qc.Evaluators()),
|
Registry: generic.NewRegistry(qc.Evaluators()),
|
||||||
}
|
}
|
||||||
resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions)
|
resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
go resourceQuotaController.Run(ctx, 2)
|
go resourceQuotaController.Run(ctx, 2)
|
||||||
|
|
||||||
// Periodically the quota controller to detect new resource types
|
// Periodically the quota controller to detect new resource types
|
||||||
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())
|
go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
|
||||||
|
|
||||||
informers.Start(ctx.Done())
|
informers.Start(ctx.Done())
|
||||||
close(informersStarted)
|
close(informersStarted)
|
||||||
@ -304,7 +306,8 @@ plugins:
|
|||||||
ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
|
ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
|
||||||
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
|
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||||
@ -331,14 +334,14 @@ plugins:
|
|||||||
InformersStarted: informersStarted,
|
InformersStarted: informersStarted,
|
||||||
Registry: generic.NewRegistry(qc.Evaluators()),
|
Registry: generic.NewRegistry(qc.Evaluators()),
|
||||||
}
|
}
|
||||||
resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions)
|
resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
go resourceQuotaController.Run(ctx, 2)
|
go resourceQuotaController.Run(ctx, 2)
|
||||||
|
|
||||||
// Periodically the quota controller to detect new resource types
|
// Periodically the quota controller to detect new resource types
|
||||||
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())
|
go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
|
||||||
|
|
||||||
informers.Start(ctx.Done())
|
informers.Start(ctx.Done())
|
||||||
close(informersStarted)
|
close(informersStarted)
|
||||||
@ -430,7 +433,8 @@ plugins:
|
|||||||
ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
|
ns := framework.CreateNamespaceOrDie(clientset, "quota", t)
|
||||||
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
|
defer framework.DeleteNamespaceOrDie(clientset, ns, t)
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
_, ctx := ktesting.NewTestContext(t)
|
||||||
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
informers := informers.NewSharedInformerFactory(clientset, controller.NoResyncPeriodFunc())
|
||||||
@ -457,14 +461,14 @@ plugins:
|
|||||||
InformersStarted: informersStarted,
|
InformersStarted: informersStarted,
|
||||||
Registry: generic.NewRegistry(qc.Evaluators()),
|
Registry: generic.NewRegistry(qc.Evaluators()),
|
||||||
}
|
}
|
||||||
resourceQuotaController, err := resourcequotacontroller.NewController(resourceQuotaControllerOptions)
|
resourceQuotaController, err := resourcequotacontroller.NewController(ctx, resourceQuotaControllerOptions)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("unexpected err: %v", err)
|
t.Fatalf("unexpected err: %v", err)
|
||||||
}
|
}
|
||||||
go resourceQuotaController.Run(ctx, 2)
|
go resourceQuotaController.Run(ctx, 2)
|
||||||
|
|
||||||
// Periodically the quota controller to detect new resource types
|
// Periodically the quota controller to detect new resource types
|
||||||
go resourceQuotaController.Sync(discoveryFunc, 30*time.Second, ctx.Done())
|
go resourceQuotaController.Sync(ctx, discoveryFunc, 30*time.Second)
|
||||||
|
|
||||||
informers.Start(ctx.Done())
|
informers.Start(ctx.Done())
|
||||||
close(informersStarted)
|
close(informersStarted)
|
||||||
|
Loading…
Reference in New Issue
Block a user