Wire contexts to RBAC controllers

This commit is contained in:
Mike Dame 2021-04-22 14:35:26 -04:00
parent 9b45983d3c
commit 3f0b6d390c
3 changed files with 20 additions and 19 deletions

View File

@ -31,6 +31,6 @@ func startClusterRoleAggregrationController(ctx context.Context, controllerConte
go clusterroleaggregation.NewClusterRoleAggregation(
controllerContext.InformerFactory.Rbac().V1().ClusterRoles(),
controllerContext.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(),
).Run(5, ctx.Done())
).Run(ctx, 5)
return nil, true, nil
}

View File

@ -49,7 +49,7 @@ type ClusterRoleAggregationController struct {
clusterRoleLister rbaclisters.ClusterRoleLister
clusterRolesSynced cache.InformerSynced
syncHandler func(key string) error
syncHandler func(ctx context.Context, key string) error
queue workqueue.RateLimitingInterface
}
@ -78,7 +78,7 @@ func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInfo
return c
}
func (c *ClusterRoleAggregationController) syncClusterRole(key string) error {
func (c *ClusterRoleAggregationController) syncClusterRole(ctx context.Context, key string) error {
_, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
@ -126,36 +126,36 @@ func (c *ClusterRoleAggregationController) syncClusterRole(key string) error {
}
if utilfeature.DefaultFeatureGate.Enabled(features.ServerSideApply) {
err = c.applyClusterRoles(sharedClusterRole.Name, newPolicyRules)
err = c.applyClusterRoles(ctx, sharedClusterRole.Name, newPolicyRules)
if errors.IsUnsupportedMediaType(err) { // TODO: Remove this fallback at least one release after ServerSideApply GA
// When Server Side Apply is not enabled, fallback to Update. This is required when running
// 1.21 since api-server can be 1.20 during the upgrade/downgrade.
// Since Server Side Apply is enabled by default in Beta, this fallback only kicks in
// if the feature has been disabled using its feature flag.
err = c.updateClusterRoles(sharedClusterRole, newPolicyRules)
err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules)
}
} else {
err = c.updateClusterRoles(sharedClusterRole, newPolicyRules)
err = c.updateClusterRoles(ctx, sharedClusterRole, newPolicyRules)
}
return err
}
func (c *ClusterRoleAggregationController) applyClusterRoles(name string, newPolicyRules []rbacv1.PolicyRule) error {
func (c *ClusterRoleAggregationController) applyClusterRoles(ctx context.Context, name string, newPolicyRules []rbacv1.PolicyRule) error {
clusterRoleApply := rbacv1ac.ClusterRole(name).
WithRules(toApplyPolicyRules(newPolicyRules)...)
opts := metav1.ApplyOptions{FieldManager: "clusterrole-aggregation-controller", Force: true}
_, err := c.clusterRoleClient.ClusterRoles().Apply(context.TODO(), clusterRoleApply, opts)
_, err := c.clusterRoleClient.ClusterRoles().Apply(ctx, clusterRoleApply, opts)
return err
}
func (c *ClusterRoleAggregationController) updateClusterRoles(sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error {
func (c *ClusterRoleAggregationController) updateClusterRoles(ctx context.Context, sharedClusterRole *rbacv1.ClusterRole, newPolicyRules []rbacv1.PolicyRule) error {
clusterRole := sharedClusterRole.DeepCopy()
clusterRole.Rules = nil
for _, rule := range newPolicyRules {
clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy())
}
_, err := c.clusterRoleClient.ClusterRoles().Update(context.TODO(), clusterRole, metav1.UpdateOptions{})
_, err := c.clusterRoleClient.ClusterRoles().Update(ctx, clusterRole, metav1.UpdateOptions{})
return err
}
@ -187,37 +187,37 @@ func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool {
}
// Run starts the controller and blocks until stopCh is closed.
func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct{}) {
func (c *ClusterRoleAggregationController) Run(ctx context.Context, workers int) {
defer utilruntime.HandleCrash()
defer c.queue.ShutDown()
klog.Infof("Starting ClusterRoleAggregator")
defer klog.Infof("Shutting down ClusterRoleAggregator")
if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) {
if !cache.WaitForNamedCacheSync("ClusterRoleAggregator", ctx.Done(), c.clusterRolesSynced) {
return
}
for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
go wait.UntilWithContext(ctx, c.runWorker, time.Second)
}
<-stopCh
<-ctx.Done()
}
func (c *ClusterRoleAggregationController) runWorker() {
for c.processNextWorkItem() {
func (c *ClusterRoleAggregationController) runWorker(ctx context.Context) {
for c.processNextWorkItem(ctx) {
}
}
func (c *ClusterRoleAggregationController) processNextWorkItem() bool {
func (c *ClusterRoleAggregationController) processNextWorkItem(ctx context.Context) bool {
dsKey, quit := c.queue.Get()
if quit {
return false
}
defer c.queue.Done(dsKey)
err := c.syncHandler(dsKey.(string))
err := c.syncHandler(ctx, dsKey.(string))
if err == nil {
c.queue.Forget(dsKey)
return true

View File

@ -17,6 +17,7 @@ limitations under the License.
package clusterroleaggregation
import (
"context"
"testing"
rbacv1 "k8s.io/api/rbac/v1"
@ -181,7 +182,7 @@ func TestSyncClusterRole(t *testing.T) {
clusterRoleClient: fakeClient.RbacV1(),
clusterRoleLister: rbaclisters.NewClusterRoleLister(indexer),
}
err := c.syncClusterRole(test.clusterRoleToSync)
err := c.syncClusterRole(context.TODO(), test.clusterRoleToSync)
if err != nil {
t.Fatal(err)
}