mirror of
https://github.com/k3s-io/kubernetes.git
synced 2026-02-22 07:03:28 +00:00
refactor(apiextensions-apiserver): Make APIApproval controller context-aware
This commit is contained in:
@@ -245,7 +245,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
go namingController.RunWithContext(hookContext)
|
||||
go establishingController.RunWithContext(hookContext)
|
||||
go nonStructuralSchemaController.RunWithContext(5, hookContext)
|
||||
go apiApprovalController.Run(5, hookContext.Done())
|
||||
go apiApprovalController.RunWithContext(5, hookContext)
|
||||
go finalizingController.Run(5, hookContext.Done())
|
||||
|
||||
discoverySyncedCh := make(chan struct{})
|
||||
|
||||
@@ -44,7 +44,7 @@ type KubernetesAPIApprovalPolicyConformantConditionController struct {
|
||||
crdSynced cache.InformerSynced
|
||||
|
||||
// To allow injection for testing.
|
||||
syncFn func(key string) error
|
||||
syncFn func(ctx context.Context, key string) error
|
||||
|
||||
queue workqueue.TypedRateLimitingInterface[string]
|
||||
|
||||
@@ -127,7 +127,7 @@ func calculateCondition(crd *apiextensionsv1.CustomResourceDefinition) *apiexten
|
||||
}
|
||||
}
|
||||
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) sync(key string) error {
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) sync(ctx context.Context, key string) error {
|
||||
inCustomResourceDefinition, err := c.crdLister.Get(key)
|
||||
if apierrors.IsNotFound(err) {
|
||||
return nil
|
||||
@@ -163,7 +163,7 @@ func (c *KubernetesAPIApprovalPolicyConformantConditionController) sync(key stri
|
||||
crd := inCustomResourceDefinition.DeepCopy()
|
||||
apihelpers.SetCRDCondition(crd, *cond)
|
||||
|
||||
_, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(context.TODO(), crd, metav1.UpdateOptions{})
|
||||
_, err = c.crdClient.CustomResourceDefinitions().UpdateStatus(ctx, crd, metav1.UpdateOptions{})
|
||||
if apierrors.IsNotFound(err) || apierrors.IsConflict(err) {
|
||||
// deleted or changed in the meantime, we'll get called again
|
||||
return nil
|
||||
@@ -183,37 +183,44 @@ func (c *KubernetesAPIApprovalPolicyConformantConditionController) sync(key stri
|
||||
|
||||
// Run starts the controller.
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) Run(workers int, stopCh <-chan struct{}) {
|
||||
c.RunWithContext(workers, wait.ContextForChannel(stopCh))
|
||||
}
|
||||
|
||||
// RunWithContext starts the controller with a context.
|
||||
//
|
||||
//logcheck:context // RunWithContext should be used instead of Run in code which supports contextual logging.
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) RunWithContext(workers int, ctx context.Context) {
|
||||
defer utilruntime.HandleCrash()
|
||||
defer c.queue.ShutDown()
|
||||
|
||||
klog.Infof("Starting KubernetesAPIApprovalPolicyConformantConditionController")
|
||||
defer klog.Infof("Shutting down KubernetesAPIApprovalPolicyConformantConditionController")
|
||||
|
||||
if !cache.WaitForCacheSync(stopCh, c.crdSynced) {
|
||||
if !cache.WaitForCacheSync(ctx.Done(), c.crdSynced) {
|
||||
return
|
||||
}
|
||||
|
||||
for i := 0; i < workers; i++ {
|
||||
go wait.Until(c.runWorker, time.Second, stopCh)
|
||||
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
|
||||
}
|
||||
|
||||
<-stopCh
|
||||
<-ctx.Done()
|
||||
}
|
||||
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) runWorker() {
|
||||
for c.processNextWorkItem() {
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) runWorker(ctx context.Context) {
|
||||
for c.processNextWorkItem(ctx) {
|
||||
}
|
||||
}
|
||||
|
||||
// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit.
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) processNextWorkItem() bool {
|
||||
func (c *KubernetesAPIApprovalPolicyConformantConditionController) processNextWorkItem(ctx context.Context) bool {
|
||||
key, quit := c.queue.Get()
|
||||
if quit {
|
||||
return false
|
||||
}
|
||||
defer c.queue.Done(key)
|
||||
|
||||
err := c.syncFn(key)
|
||||
err := c.syncFn(ctx, key)
|
||||
if err == nil {
|
||||
c.queue.Forget(key)
|
||||
return true
|
||||
|
||||
Reference in New Issue
Block a user