From 3f0b6d390cf49acdab7779f559318c6157880441 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Thu, 22 Apr 2021 14:35:26 -0400 Subject: [PATCH] Wire contexts to RBAC controllers --- cmd/kube-controller-manager/app/rbac.go | 2 +- .../clusterroleaggregation_controller.go | 34 +++++++++---------- .../clusterroleaggregation_controller_test.go | 3 +- 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/cmd/kube-controller-manager/app/rbac.go b/cmd/kube-controller-manager/app/rbac.go index 718be6955ef..020ae38db63 100644 --- a/cmd/kube-controller-manager/app/rbac.go +++ b/cmd/kube-controller-manager/app/rbac.go @@ -31,6 +31,6 @@ func startClusterRoleAggregrationController(ctx context.Context, controllerConte go clusterroleaggregation.NewClusterRoleAggregation( controllerContext.InformerFactory.Rbac().V1().ClusterRoles(), controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), - ).Run(5, ctx.Done()) + ).Run(ctx, 5) return nil, true, nil } diff --git a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go index 15a5fc16571..e979eb3a72f 100644 --- a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go +++ b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go @@ -49,7 +49,7 @@ type ClusterRoleAggregationController struct { clusterRoleLister rbaclisters.ClusterRoleLister clusterRolesSynced cache.InformerSynced - syncHandler func(key string) error + syncHandler func(ctx context.Context, key string) error queue workqueue.RateLimitingInterface } @@ -78,7 +78,7 @@ func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInfo return c } -func (c *ClusterRoleAggregationController) syncClusterRole(key string) error { +func (c *ClusterRoleAggregationController) syncClusterRole(ctx context.Context, key string) error { _, name, err := cache.SplitMetaNamespaceKey(key) if err != nil { return err @@ -126,36 +126,36 @@ func (c *ClusterRoleAggregationController) syncClusterRole(key string) error { } if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) { - err = c.applyClusterRoles(sharedClusterRole.Name, newPolicyRules) + err = c.applyClusterRoles(ctx, sharedClusterRole.Name, newPolicyRules) if errors.IsUnsupportedMediaType(err) { // TODO: Remove this fallback at least one release after ServerSideApply GA // When Server Side Apply is not enabled, fallback to Update. This is required when running // 1.21 since api-server can be 1.20 during the upgrade/downgrade. // Since Server Side Apply is enabled by default in Beta, this fallback only kicks in // if the feature has been disabled using its feature flag. - err = c.updateClusterRoles(sharedClusterRole, newPolicyRules) + err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules) } } else { - err = c.updateClusterRoles(sharedClusterRole, newPolicyRules) + err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules) } return err } -func (c *ClusterRoleAggregationController) applyClusterRoles(name string, newPolicyRules []rbacv1.PolicyRule) error { +func (c *ClusterRoleAggregationController) applyClusterRoles(ctx context.Context, name string, newPolicyRules []rbacv1.PolicyRule) error { clusterRoleApply := rbacv1ac.ClusterRole(name). WithRules(toApplyPolicyRules(newPolicyRules)...) opts := metav1.ApplyOptions{FieldManager: "clusterrole-aggregation-controller", Force: true} - _, err := c.clusterRoleClient.ClusterRoles().Apply(context.TODO(), clusterRoleApply, opts) + _, err := c.clusterRoleClient.ClusterRoles().Apply(ctx, clusterRoleApply, opts) return err } -func (c *ClusterRoleAggregationController) updateClusterRoles(sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error { +func (c *ClusterRoleAggregationController) updateClusterRoles(ctx context.Context, sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error { clusterRole := sharedClusterRole.DeepCopy() clusterRole.Rules = nil for _, rule := range newPolicyRules { clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy()) } - _, err := c.clusterRoleClient.ClusterRoles().Update(context.TODO(), clusterRole, metav1.UpdateOptions{}) + _, err := c.clusterRoleClient.ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{}) return err } @@ -187,37 +187,37 @@ func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool { } // Run starts the controller and blocks until stopCh is closed. -func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct{}) { +func (c *ClusterRoleAggregationController) Run(ctx context.Context, workers int) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() klog.Infof("Starting ClusterRoleAggregator") defer klog.Infof("Shutting down ClusterRoleAggregator") - if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) { + if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", ctx.Done(), c.clusterRolesSynced) { return } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + go wait.UntilWithContext(ctx, c.runWorker, time.Second) } - <-stopCh + <-ctx.Done() } -func (c *ClusterRoleAggregationController) runWorker() { - for c.processNextWorkItem() { +func (c *ClusterRoleAggregationController) runWorker(ctx context.Context) { + for c.processNextWorkItem(ctx) { } } -func (c *ClusterRoleAggregationController) processNextWorkItem() bool { +func (c *ClusterRoleAggregationController) processNextWorkItem(ctx context.Context) bool { dsKey, quit := c.queue.Get() if quit { return false } defer c.queue.Done(dsKey) - err := c.syncHandler(dsKey.(string)) + err := c.syncHandler(ctx, dsKey.(string)) if err == nil { c.queue.Forget(dsKey) return true diff --git a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller_test.go b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller_test.go index 2895816d02f..49687d76fdf 100644 --- a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller_test.go +++ b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller_test.go @@ -17,6 +17,7 @@ limitations under the License. package clusterroleaggregation import ( + "context" "testing" rbacv1 "k8s.io/api/rbac/v1" @@ -181,7 +182,7 @@ func TestSyncClusterRole(t *testing.T) { clusterRoleClient: fakeClient.RbacV1(), clusterRoleLister: rbaclisters.NewClusterRoleLister(indexer), } - err := c.syncClusterRole(test.clusterRoleToSync) + err := c.syncClusterRole(context.TODO(), test.clusterRoleToSync) if err != nil { t.Fatal(err) }