mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-27 05:04:52 +00:00
make quota reusable
This commit is contained in:
@@ -148,28 +148,31 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*Resou
|
||||
rq.resyncPeriod(),
|
||||
)
|
||||
|
||||
qm := &QuotaMonitor{
|
||||
informersStarted: options.InformersStarted,
|
||||
informerFactory: options.InformerFactory,
|
||||
ignoredResources: options.IgnoredResourcesFunc(),
|
||||
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
|
||||
resyncPeriod: options.ReplenishmentResyncPeriod,
|
||||
replenishmentFunc: rq.replenishQuota,
|
||||
registry: rq.registry,
|
||||
}
|
||||
rq.quotaMonitor = qm
|
||||
if options.DiscoveryFunc != nil {
|
||||
qm := &QuotaMonitor{
|
||||
informersStarted: options.InformersStarted,
|
||||
informerFactory: options.InformerFactory,
|
||||
ignoredResources: options.IgnoredResourcesFunc(),
|
||||
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
|
||||
resyncPeriod: options.ReplenishmentResyncPeriod,
|
||||
replenishmentFunc: rq.replenishQuota,
|
||||
registry: rq.registry,
|
||||
}
|
||||
|
||||
// do initial quota monitor setup
|
||||
resources, err := GetQuotableResources(options.DiscoveryFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = qm.syncMonitors(resources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
|
||||
}
|
||||
rq.quotaMonitor = qm
|
||||
|
||||
// only start quota once all informers synced
|
||||
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
|
||||
// do initial quota monitor setup
|
||||
resources, err := GetQuotableResources(options.DiscoveryFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = qm.SyncMonitors(resources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
|
||||
}
|
||||
|
||||
// only start quota once all informers synced
|
||||
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
|
||||
}
|
||||
|
||||
return rq, nil
|
||||
}
|
||||
@@ -274,7 +277,9 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
|
||||
glog.Infof("Starting resource quota controller")
|
||||
defer glog.Infof("Shutting down resource quota controller")
|
||||
|
||||
go rq.quotaMonitor.Run(stopCh)
|
||||
if rq.quotaMonitor != nil {
|
||||
go rq.quotaMonitor.Run(stopCh)
|
||||
}
|
||||
|
||||
if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
|
||||
return
|
||||
@@ -446,7 +451,7 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
|
||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
||||
return
|
||||
}
|
||||
if !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
|
||||
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
|
||||
}
|
||||
}, period, stopCh)
|
||||
@@ -455,10 +460,14 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
|
||||
// resyncMonitors starts or stops quota monitors as needed to ensure that all
|
||||
// (and only) those resources present in the map are monitored.
|
||||
func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
if err := rq.quotaMonitor.syncMonitors(resources); err != nil {
|
||||
if rq.quotaMonitor == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
|
||||
return err
|
||||
}
|
||||
rq.quotaMonitor.startMonitors()
|
||||
rq.quotaMonitor.StartMonitors()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -100,6 +100,18 @@ type QuotaMonitor struct {
|
||||
registry quota.Registry
|
||||
}
|
||||
|
||||
func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
|
||||
return &QuotaMonitor{
|
||||
informersStarted: informersStarted,
|
||||
informerFactory: informerFactory,
|
||||
ignoredResources: ignoredResources,
|
||||
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
|
||||
resyncPeriod: resyncPeriod,
|
||||
replenishmentFunc: replenishmentFunc,
|
||||
registry: registry,
|
||||
}
|
||||
}
|
||||
|
||||
// monitor runs a Controller with a local stop channel.
|
||||
type monitor struct {
|
||||
controller cache.Controller
|
||||
@@ -171,13 +183,13 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac
|
||||
return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String())
|
||||
}
|
||||
|
||||
// syncMonitors rebuilds the monitor set according to the supplied resources,
|
||||
// SyncMonitors rebuilds the monitor set according to the supplied resources,
|
||||
// creating or deleting monitors as necessary. It will return any error
|
||||
// encountered, but will make an attempt to create a monitor for each resource
|
||||
// instead of immediately exiting on an error. It may be called before or after
|
||||
// Run. Monitors are NOT started as part of the sync. To ensure all existing
|
||||
// monitors are started, call startMonitors.
|
||||
func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
// monitors are started, call StartMonitors.
|
||||
func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
qm.monitorLock.Lock()
|
||||
defer qm.monitorLock.Unlock()
|
||||
|
||||
@@ -232,12 +244,12 @@ func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]s
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// startMonitors ensures the current set of monitors are running. Any newly
|
||||
// StartMonitors ensures the current set of monitors are running. Any newly
|
||||
// started monitors will also cause shared informers to be started.
|
||||
//
|
||||
// If called before Run, startMonitors does nothing (as there is no stop channel
|
||||
// If called before Run, StartMonitors does nothing (as there is no stop channel
|
||||
// to support monitor/informer execution).
|
||||
func (qm *QuotaMonitor) startMonitors() {
|
||||
func (qm *QuotaMonitor) StartMonitors() {
|
||||
qm.monitorLock.Lock()
|
||||
defer qm.monitorLock.Unlock()
|
||||
|
||||
@@ -295,7 +307,7 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
||||
|
||||
// Start monitors and begin change processing until the stop channel is
|
||||
// closed.
|
||||
qm.startMonitors()
|
||||
qm.StartMonitors()
|
||||
wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)
|
||||
|
||||
// Stop any running monitors.
|
||||
|
Reference in New Issue
Block a user