mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-09-22 02:18:51 +00:00
Merge pull request #57149 from deads2k/rebase-02-quota
Automatic merge from submit-queue (batch tested with PRs 58144, 57149). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>. make quota reusable Quota is logically a re-useable component so that replenishment and admission can be done in constituent servers. This makes it possible to embed. More cleanup is probably required, but this is the minimum.
This commit is contained in:
@@ -148,28 +148,31 @@ func NewResourceQuotaController(options *ResourceQuotaControllerOptions) (*Resou
|
||||
rq.resyncPeriod(),
|
||||
)
|
||||
|
||||
qm := &QuotaMonitor{
|
||||
informersStarted: options.InformersStarted,
|
||||
informerFactory: options.InformerFactory,
|
||||
ignoredResources: options.IgnoredResourcesFunc(),
|
||||
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
|
||||
resyncPeriod: options.ReplenishmentResyncPeriod,
|
||||
replenishmentFunc: rq.replenishQuota,
|
||||
registry: rq.registry,
|
||||
}
|
||||
rq.quotaMonitor = qm
|
||||
if options.DiscoveryFunc != nil {
|
||||
qm := &QuotaMonitor{
|
||||
informersStarted: options.InformersStarted,
|
||||
informerFactory: options.InformerFactory,
|
||||
ignoredResources: options.IgnoredResourcesFunc(),
|
||||
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
|
||||
resyncPeriod: options.ReplenishmentResyncPeriod,
|
||||
replenishmentFunc: rq.replenishQuota,
|
||||
registry: rq.registry,
|
||||
}
|
||||
|
||||
// do initial quota monitor setup
|
||||
resources, err := GetQuotableResources(options.DiscoveryFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = qm.syncMonitors(resources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
|
||||
}
|
||||
rq.quotaMonitor = qm
|
||||
|
||||
// only start quota once all informers synced
|
||||
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
|
||||
// do initial quota monitor setup
|
||||
resources, err := GetQuotableResources(options.DiscoveryFunc)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err = qm.SyncMonitors(resources); err != nil {
|
||||
utilruntime.HandleError(fmt.Errorf("initial monitor sync has error: %v", err))
|
||||
}
|
||||
|
||||
// only start quota once all informers synced
|
||||
rq.informerSyncedFuncs = append(rq.informerSyncedFuncs, qm.IsSynced)
|
||||
}
|
||||
|
||||
return rq, nil
|
||||
}
|
||||
@@ -272,7 +275,9 @@ func (rq *ResourceQuotaController) Run(workers int, stopCh <-chan struct{}) {
|
||||
glog.Infof("Starting resource quota controller")
|
||||
defer glog.Infof("Shutting down resource quota controller")
|
||||
|
||||
go rq.quotaMonitor.Run(stopCh)
|
||||
if rq.quotaMonitor != nil {
|
||||
go rq.quotaMonitor.Run(stopCh)
|
||||
}
|
||||
|
||||
if !controller.WaitForCacheSync("resource quota", stopCh, rq.informerSyncedFuncs...) {
|
||||
return
|
||||
@@ -443,7 +448,7 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
|
||||
utilruntime.HandleError(fmt.Errorf("failed to sync resource monitors: %v", err))
|
||||
return
|
||||
}
|
||||
if !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
|
||||
if rq.quotaMonitor != nil && !controller.WaitForCacheSync("resource quota", stopCh, rq.quotaMonitor.IsSynced) {
|
||||
utilruntime.HandleError(fmt.Errorf("timed out waiting for quota monitor sync"))
|
||||
}
|
||||
}, period, stopCh)
|
||||
@@ -452,10 +457,14 @@ func (rq *ResourceQuotaController) Sync(discoveryFunc NamespacedResourcesFunc, p
|
||||
// resyncMonitors starts or stops quota monitors as needed to ensure that all
|
||||
// (and only) those resources present in the map are monitored.
|
||||
func (rq *ResourceQuotaController) resyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
if err := rq.quotaMonitor.syncMonitors(resources); err != nil {
|
||||
if rq.quotaMonitor == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if err := rq.quotaMonitor.SyncMonitors(resources); err != nil {
|
||||
return err
|
||||
}
|
||||
rq.quotaMonitor.startMonitors()
|
||||
rq.quotaMonitor.StartMonitors()
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@@ -101,6 +101,18 @@ type QuotaMonitor struct {
|
||||
registry quota.Registry
|
||||
}
|
||||
|
||||
func NewQuotaMonitor(informersStarted <-chan struct{}, informerFactory InformerFactory, ignoredResources map[schema.GroupResource]struct{}, resyncPeriod controller.ResyncPeriodFunc, replenishmentFunc ReplenishmentFunc, registry quota.Registry) *QuotaMonitor {
|
||||
return &QuotaMonitor{
|
||||
informersStarted: informersStarted,
|
||||
informerFactory: informerFactory,
|
||||
ignoredResources: ignoredResources,
|
||||
resourceChanges: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "resource_quota_controller_resource_changes"),
|
||||
resyncPeriod: resyncPeriod,
|
||||
replenishmentFunc: replenishmentFunc,
|
||||
registry: registry,
|
||||
}
|
||||
}
|
||||
|
||||
// monitor runs a Controller with a local stop channel.
|
||||
type monitor struct {
|
||||
controller cache.Controller
|
||||
@@ -172,13 +184,13 @@ func (qm *QuotaMonitor) controllerFor(resource schema.GroupVersionResource) (cac
|
||||
return nil, fmt.Errorf("unable to monitor quota for resource %q", resource.String())
|
||||
}
|
||||
|
||||
// syncMonitors rebuilds the monitor set according to the supplied resources,
|
||||
// SyncMonitors rebuilds the monitor set according to the supplied resources,
|
||||
// creating or deleting monitors as necessary. It will return any error
|
||||
// encountered, but will make an attempt to create a monitor for each resource
|
||||
// instead of immediately exiting on an error. It may be called before or after
|
||||
// Run. Monitors are NOT started as part of the sync. To ensure all existing
|
||||
// monitors are started, call startMonitors.
|
||||
func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
// monitors are started, call StartMonitors.
|
||||
func (qm *QuotaMonitor) SyncMonitors(resources map[schema.GroupVersionResource]struct{}) error {
|
||||
qm.monitorLock.Lock()
|
||||
defer qm.monitorLock.Unlock()
|
||||
|
||||
@@ -233,12 +245,12 @@ func (qm *QuotaMonitor) syncMonitors(resources map[schema.GroupVersionResource]s
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// startMonitors ensures the current set of monitors are running. Any newly
|
||||
// StartMonitors ensures the current set of monitors are running. Any newly
|
||||
// started monitors will also cause shared informers to be started.
|
||||
//
|
||||
// If called before Run, startMonitors does nothing (as there is no stop channel
|
||||
// If called before Run, StartMonitors does nothing (as there is no stop channel
|
||||
// to support monitor/informer execution).
|
||||
func (qm *QuotaMonitor) startMonitors() {
|
||||
func (qm *QuotaMonitor) StartMonitors() {
|
||||
qm.monitorLock.Lock()
|
||||
defer qm.monitorLock.Unlock()
|
||||
|
||||
@@ -297,7 +309,7 @@ func (qm *QuotaMonitor) Run(stopCh <-chan struct{}) {
|
||||
|
||||
// Start monitors and begin change processing until the stop channel is
|
||||
// closed.
|
||||
qm.startMonitors()
|
||||
qm.StartMonitors()
|
||||
wait.Until(qm.runProcessResourceChanges, 1*time.Second, stopCh)
|
||||
|
||||
// Stop any running monitors.
|
||||
|
Reference in New Issue
Block a user