Merge pull request #104231 from astraw99/fix_unified_workers

Unify controller worker num param `threadiness` to `workers`
This commit is contained in:
Kubernetes Prow Robot 2021-08-27 09:34:05 -07:00 committed by GitHub
commit fca3175df7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 21 additions and 21 deletions

View File

@ -319,7 +319,7 @@ func (expc *expandController) expand(pvc *v1.PersistentVolumeClaim, pv *v1.Persi
return detailedErr return detailedErr
} }
// TODO make concurrency configurable (workers/threadiness argument). previously, nestedpendingoperations spawned unlimited goroutines // TODO make concurrency configurable (workers argument). previously, nestedpendingoperations spawned unlimited goroutines
func (expc *expandController) Run(stopCh <-chan struct{}) { func (expc *expandController) Run(stopCh <-chan struct{}) {
defer runtime.HandleCrash() defer runtime.HandleCrash()
defer expc.queue.ShutDown() defer expc.queue.ShutDown()

View File

@ -432,7 +432,7 @@ func (c *Controller) Enqueue() {
} }
// Run the controller until stopped. // Run the controller until stopped.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) { func (c *Controller) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end // make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown() defer c.queue.ShutDown()

View File

@ -32,7 +32,7 @@ import (
"k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
) )
// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for // AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for
@ -103,7 +103,7 @@ func NewCRDRegistrationController(crdinformer crdinformers.CustomResourceDefinit
return c return c
} }
func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{}) { func (c *crdRegistrationController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end // make sure the work queue is shutdown which will trigger workers to end
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -130,8 +130,8 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{})
} }
close(c.syncedInitialSet) close(c.syncedInitialSet)
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers // start up your worker threads based on workers. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ { for i := 0; i < workers; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker // runWorker will loop until "something bad" happens. The .Until will then rekick the worker
// after one second // after one second
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)

View File

@ -179,7 +179,7 @@ func (c *KubernetesAPIApprovalPolicyConformantConditionController) sync(key stri
} }
// Run starts the controller. // Run starts the controller.
func (c *KubernetesAPIApprovalPolicyConformantConditionController) Run(threadiness int, stopCh <-chan struct{}) { func (c *KubernetesAPIApprovalPolicyConformantConditionController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -190,7 +190,7 @@ func (c *KubernetesAPIApprovalPolicyConformantConditionController) Run(threadine
return return
} }
for i := 0; i < threadiness; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)
} }

View File

@ -185,7 +185,7 @@ func (c *ConditionController) sync(key string) error {
} }
// Run starts the controller. // Run starts the controller.
func (c *ConditionController) Run(threadiness int, stopCh <-chan struct{}) { func (c *ConditionController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -196,7 +196,7 @@ func (c *ConditionController) Run(threadiness int, stopCh <-chan struct{}) {
return return
} }
for i := 0; i < threadiness; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)
} }

View File

@ -116,7 +116,7 @@ func (c *Controller) handleErr(err error, key interface{}) {
} }
// Run begins watching and syncing. // Run begins watching and syncing.
func (c *Controller) Run(threadiness int, stopCh chan struct{}) { func (c *Controller) Run(workers int, stopCh chan struct{}) {
defer runtime.HandleCrash() defer runtime.HandleCrash()
// Let the workers stop when we are done // Let the workers stop when we are done
@ -131,7 +131,7 @@ func (c *Controller) Run(threadiness int, stopCh chan struct{}) {
return return
} }
for i := 0; i < threadiness; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)
} }

View File

@ -270,7 +270,7 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg
return nil return nil
}) })
s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error { s.GenericAPIServer.AddPostStartHookOrDie("apiservice-status-available-controller", func(context genericapiserver.PostStartHookContext) error {
// if we end up blocking for long periods of time, we may need to increase threadiness. // if we end up blocking for long periods of time, we may need to increase workers.
go availableController.Run(5, context.StopCh) go availableController.Run(5, context.StopCh)
return nil return nil
}) })

View File

@ -33,7 +33,7 @@ import (
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
"k8s.io/kube-aggregator/pkg/apis/apiregistration/v1" v1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"
apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1" apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset/typed/apiregistration/v1"
informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1" informers "k8s.io/kube-aggregator/pkg/client/informers/externalversions/apiregistration/v1"
listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1" listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/v1"
@ -132,7 +132,7 @@ func NewAutoRegisterController(apiServiceInformer informers.APIServiceInformer,
} }
// Run starts the autoregister controller in a loop which syncs API services until stopCh is closed. // Run starts the autoregister controller in a loop which syncs API services until stopCh is closed.
func (c *autoRegisterController) Run(threadiness int, stopCh <-chan struct{}) { func (c *autoRegisterController) Run(workers int, stopCh <-chan struct{}) {
// don't let panics crash the process // don't let panics crash the process
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
// make sure the work queue is shutdown which will trigger workers to end // make sure the work queue is shutdown which will trigger workers to end
@ -153,8 +153,8 @@ func (c *autoRegisterController) Run(threadiness int, stopCh <-chan struct{}) {
} }
} }
// start up your worker threads based on threadiness. Some controllers have multiple kinds of workers // start up your worker threads based on workers. Some controllers have multiple kinds of workers
for i := 0; i < threadiness; i++ { for i := 0; i < workers; i++ {
// runWorker will loop until "something bad" happens. The .Until will then rekick the worker // runWorker will loop until "something bad" happens. The .Until will then rekick the worker
// after one second // after one second
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)

View File

@ -484,7 +484,7 @@ func (c *AvailableConditionController) updateAPIServiceStatus(originalAPIService
} }
// Run starts the AvailableConditionController loop which manages the availability condition of API services. // Run starts the AvailableConditionController loop which manages the availability condition of API services.
func (c *AvailableConditionController) Run(threadiness int, stopCh <-chan struct{}) { func (c *AvailableConditionController) Run(workers int, stopCh <-chan struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
@ -495,7 +495,7 @@ func (c *AvailableConditionController) Run(threadiness int, stopCh <-chan struct
return return
} }
for i := 0; i < threadiness; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)
} }

View File

@ -148,7 +148,7 @@ func NewController(
// as syncing informer caches and starting workers. It will block until stopCh // as syncing informer caches and starting workers. It will block until stopCh
// is closed, at which point it will shutdown the workqueue and wait for // is closed, at which point it will shutdown the workqueue and wait for
// workers to finish processing their current work items. // workers to finish processing their current work items.
func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error { func (c *Controller) Run(workers int, stopCh <-chan struct{}) error {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.workqueue.ShutDown() defer c.workqueue.ShutDown()
@ -163,7 +163,7 @@ func (c *Controller) Run(threadiness int, stopCh <-chan struct{}) error {
klog.Info("Starting workers") klog.Info("Starting workers")
// Launch two workers to process Foo resources // Launch two workers to process Foo resources
for i := 0; i < threadiness; i++ { for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)
} }