diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index b90a742adb7..c53be756e44 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -358,6 +358,7 @@ func NewControllerInitializers() map[string]InitFunc { controllers["persistentvolume-binder"] = startPersistentVolumeBinderController controllers["attachdetach"] = startAttachDetachController controllers["persistentvolume-expander"] = startVolumeExpandController + controllers["clusterrole-aggregation"] = startClusterRoleAggregrationController return controllers } diff --git a/cmd/kube-controller-manager/app/rbac.go b/cmd/kube-controller-manager/app/rbac.go new file mode 100644 index 00000000000..b49d3403fe0 --- /dev/null +++ b/cmd/kube-controller-manager/app/rbac.go @@ -0,0 +1,33 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package app + +import ( + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/kubernetes/pkg/controller/clusterroleaggregation" +) + +func startClusterRoleAggregrationController(ctx ControllerContext) (bool, error) { + if !ctx.AvailableResources[schema.GroupVersionResource{Group: "rbac.authorization.k8s.io", Version: "v1", Resource: "clusterroles"}] { + return false, nil + } + go clusterroleaggregation.NewClusterRoleAggregation( + ctx.InformerFactory.Rbac().V1().ClusterRoles(), + ctx.ClientBuilder.ClientOrDie("clusterrole-aggregation-controller").RbacV1(), + ).Run(5, ctx.Stop) + return true, nil +} diff --git a/hack/.golint_failures b/hack/.golint_failures index 4a31103c218..312e0fa9240 100644 --- a/hack/.golint_failures +++ b/hack/.golint_failures @@ -96,6 +96,7 @@ pkg/controller/certificates pkg/controller/certificates/approver pkg/controller/certificates/signer pkg/controller/cloud +pkg/controller/clusterroleaggregation pkg/controller/cronjob pkg/controller/daemon pkg/controller/daemon/util diff --git a/pkg/controller/clusterroleaggregation/BUILD b/pkg/controller/clusterroleaggregation/BUILD new file mode 100644 index 00000000000..94a93af996d --- /dev/null +++ b/pkg/controller/clusterroleaggregation/BUILD @@ -0,0 +1,57 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["clusterroleaggregation_controller.go"], + importpath = "k8s.io/kubernetes/pkg/controller/clusterroleaggregation", + visibility = ["//visibility:public"], + deps = [ + "//pkg/controller:go_default_library", + "//vendor/github.com/golang/glog:go_default_library", + "//vendor/k8s.io/api/rbac/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//vendor/k8s.io/client-go/informers/rbac/v1:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/typed/rbac/v1:go_default_library", + "//vendor/k8s.io/client-go/listers/rbac/v1:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + "//vendor/k8s.io/client-go/util/workqueue:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) + +go_test( + name = "go_default_test", + srcs = ["clusterroleaggregation_controller_test.go"], + importpath = "k8s.io/kubernetes/pkg/controller/clusterroleaggregation", + library = ":go_default_library", + deps = [ + "//pkg/controller:go_default_library", + "//vendor/k8s.io/api/rbac/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/api/equality:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/util/diff:go_default_library", + "//vendor/k8s.io/client-go/kubernetes/fake:go_default_library", + "//vendor/k8s.io/client-go/listers/rbac/v1:go_default_library", + "//vendor/k8s.io/client-go/testing:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", + ], +) diff --git a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go new file mode 100644 index 00000000000..05879e0e681 --- /dev/null +++ b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller.go @@ -0,0 +1,213 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterroleaggregation + +import ( + "fmt" + "sort" + "time" + + "github.com/golang/glog" + + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/equality" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + rbacinformers "k8s.io/client-go/informers/rbac/v1" + rbacclient "k8s.io/client-go/kubernetes/typed/rbac/v1" + rbaclisters "k8s.io/client-go/listers/rbac/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controller" +) + +// ClusterRoleAggregationController is a controller to combine cluster roles +type ClusterRoleAggregationController struct { + clusterRoleClient rbacclient.ClusterRolesGetter + clusterRoleLister rbaclisters.ClusterRoleLister + clusterRolesSynced cache.InformerSynced + + syncHandler func(key string) error + queue workqueue.RateLimitingInterface +} + +// NewClusterRoleAggregation creates a new controller +func NewClusterRoleAggregation(clusterRoleInformer rbacinformers.ClusterRoleInformer, clusterRoleClient rbacclient.ClusterRolesGetter) *ClusterRoleAggregationController { + c := &ClusterRoleAggregationController{ + clusterRoleClient: clusterRoleClient, + clusterRoleLister: clusterRoleInformer.Lister(), + clusterRolesSynced: clusterRoleInformer.Informer().HasSynced, + + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ClusterRoleAggregator"), + } + c.syncHandler = c.syncClusterRole + + clusterRoleInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + c.enqueue() + }, + UpdateFunc: func(old, cur interface{}) { + c.enqueue() + }, + DeleteFunc: func(uncast interface{}) { + c.enqueue() + }, + }) + return c +} + +func (c *ClusterRoleAggregationController) syncClusterRole(key string) error { + _, name, err := cache.SplitMetaNamespaceKey(key) + if err != nil { + return err + } + sharedClusterRole, err := c.clusterRoleLister.Get(name) + if errors.IsNotFound(err) { + return nil + } + if err != nil { + return err + } + if sharedClusterRole.AggregationRule == nil { + return nil + } + + newPolicyRules := []rbacv1.PolicyRule{} + for i := range sharedClusterRole.AggregationRule.ClusterRoleSelectors { + selector := sharedClusterRole.AggregationRule.ClusterRoleSelectors[i] + runtimeLabelSelector, err := metav1.LabelSelectorAsSelector(&selector) + if err != nil { + return err + } + clusterRoles, err := c.clusterRoleLister.List(runtimeLabelSelector) + if err != nil { + return err + } + sort.Sort(byName(clusterRoles)) + + for i := range clusterRoles { + if clusterRoles[i].Name == sharedClusterRole.Name { + continue + } + + for j := range clusterRoles[i].Rules { + currRule := clusterRoles[i].Rules[j] + if !ruleExists(newPolicyRules, currRule) { + newPolicyRules = append(newPolicyRules, currRule) + } + } + } + } + + if equality.Semantic.DeepEqual(newPolicyRules, sharedClusterRole.Rules) { + return nil + } + + // we need to update + clusterRole := sharedClusterRole.DeepCopy() + clusterRole.Rules = nil + for _, rule := range newPolicyRules { + clusterRole.Rules = append(clusterRole.Rules, *rule.DeepCopy()) + } + _, err = c.clusterRoleClient.ClusterRoles().Update(clusterRole) + + return err +} + +func ruleExists(haystack []rbacv1.PolicyRule, needle rbacv1.PolicyRule) bool { + for _, curr := range haystack { + if equality.Semantic.DeepEqual(curr, needle) { + return true + } + } + return false +} + +// Run starts the controller and blocks until stopCh is closed. +func (c *ClusterRoleAggregationController) Run(workers int, stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.queue.ShutDown() + + glog.Infof("Starting ClusterRoleAggregator") + defer glog.Infof("Shutting down ClusterRoleAggregator") + + if !controller.WaitForCacheSync("ClusterRoleAggregator", stopCh, c.clusterRolesSynced) { + return + } + + for i := 0; i < workers; i++ { + go wait.Until(c.runWorker, time.Second, stopCh) + } + + <-stopCh +} + +func (c *ClusterRoleAggregationController) runWorker() { + for c.processNextWorkItem() { + } +} + +func (c *ClusterRoleAggregationController) processNextWorkItem() bool { + dsKey, quit := c.queue.Get() + if quit { + return false + } + defer c.queue.Done(dsKey) + + err := c.syncHandler(dsKey.(string)) + if err == nil { + c.queue.Forget(dsKey) + return true + } + + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", dsKey, err)) + c.queue.AddRateLimited(dsKey) + + return true +} + +func (c *ClusterRoleAggregationController) enqueue() { + // this is unusual, but since the set of all clusterroles is small and we don't know the dependency + // graph, just queue up every thing each time. This allows errors to be selectively retried if there + // is a problem updating a single role + allClusterRoles, err := c.clusterRoleLister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't list all objects %v", err)) + return + } + for _, clusterRole := range allClusterRoles { + // only queue ones that we may need to aggregate + if clusterRole.AggregationRule == nil { + continue + } + key, err := controller.KeyFunc(clusterRole) + if err != nil { + utilruntime.HandleError(fmt.Errorf("Couldn't get key for object %#v: %v", clusterRole, err)) + return + } + c.queue.Add(key) + } +} + +type byName []*rbacv1.ClusterRole + +func (a byName) Len() int { return len(a) } +func (a byName) Swap(i, j int) { a[i], a[j] = a[j], a[i] } +func (a byName) Less(i, j int) bool { return a[i].Name < a[j].Name } diff --git a/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller_test.go b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller_test.go new file mode 100644 index 00000000000..de007cdd3f3 --- /dev/null +++ b/pkg/controller/clusterroleaggregation/clusterroleaggregation_controller_test.go @@ -0,0 +1,182 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package clusterroleaggregation + +import ( + "testing" + + rbacv1 "k8s.io/api/rbac/v1" + "k8s.io/apimachinery/pkg/api/equality" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/util/diff" + fakeclient "k8s.io/client-go/kubernetes/fake" + rbaclisters "k8s.io/client-go/listers/rbac/v1" + clienttesting "k8s.io/client-go/testing" + "k8s.io/client-go/tools/cache" + "k8s.io/kubernetes/pkg/controller" +) + +func TestSyncClusterRole(t *testing.T) { + hammerRules := func() []rbacv1.PolicyRule { + return []rbacv1.PolicyRule{ + {Verbs: []string{"hammer"}, Resources: []string{"nails"}}, + {Verbs: []string{"hammer"}, Resources: []string{"wedges"}}, + } + } + chiselRules := func() []rbacv1.PolicyRule { + return []rbacv1.PolicyRule{ + {Verbs: []string{"chisel"}, Resources: []string{"mortises"}}, + } + } + sawRules := func() []rbacv1.PolicyRule { + return []rbacv1.PolicyRule{ + {Verbs: []string{"saw"}, Resources: []string{"boards"}}, + } + } + role := func(name string, labels map[string]string, rules []rbacv1.PolicyRule) *rbacv1.ClusterRole { + return &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: name, Labels: labels}, + Rules: rules, + } + } + combinedRole := func(selectors []map[string]string, rules ...[]rbacv1.PolicyRule) *rbacv1.ClusterRole { + ret := &rbacv1.ClusterRole{ + ObjectMeta: metav1.ObjectMeta{Name: "combined"}, + AggregationRule: &rbacv1.AggregationRule{}, + } + for _, selector := range selectors { + ret.AggregationRule.ClusterRoleSelectors = append(ret.AggregationRule.ClusterRoleSelectors, + metav1.LabelSelector{MatchLabels: selector}) + } + for _, currRules := range rules { + ret.Rules = append(ret.Rules, currRules...) + } + return ret + } + + tests := []struct { + name string + startingClusterRoles []*rbacv1.ClusterRole + clusterRoleToSync string + expectedClusterRole *rbacv1.ClusterRole + }{ + { + name: "remove dead rules", + startingClusterRoles: []*rbacv1.ClusterRole{ + role("hammer", map[string]string{"foo": "bar"}, hammerRules()), + combinedRole([]map[string]string{{"foo": "bar"}}, sawRules()), + }, + clusterRoleToSync: "combined", + expectedClusterRole: combinedRole([]map[string]string{{"foo": "bar"}}, hammerRules()), + }, + { + name: "strip rules", + startingClusterRoles: []*rbacv1.ClusterRole{ + role("hammer", map[string]string{"foo": "not-bar"}, hammerRules()), + combinedRole([]map[string]string{{"foo": "bar"}}, hammerRules()), + }, + clusterRoleToSync: "combined", + expectedClusterRole: combinedRole([]map[string]string{{"foo": "bar"}}), + }, + { + name: "select properly and put in order", + startingClusterRoles: []*rbacv1.ClusterRole{ + role("hammer", map[string]string{"foo": "bar"}, hammerRules()), + role("chisel", map[string]string{"foo": "bar"}, chiselRules()), + role("saw", map[string]string{"foo": "not-bar"}, sawRules()), + combinedRole([]map[string]string{{"foo": "bar"}}), + }, + clusterRoleToSync: "combined", + expectedClusterRole: combinedRole([]map[string]string{{"foo": "bar"}}, chiselRules(), hammerRules()), + }, + { + name: "select properly with multiple selectors", + startingClusterRoles: []*rbacv1.ClusterRole{ + role("hammer", map[string]string{"foo": "bar"}, hammerRules()), + role("chisel", map[string]string{"foo": "bar"}, chiselRules()), + role("saw", map[string]string{"foo": "not-bar"}, sawRules()), + combinedRole([]map[string]string{{"foo": "bar"}, {"foo": "not-bar"}}), + }, + clusterRoleToSync: "combined", + expectedClusterRole: combinedRole([]map[string]string{{"foo": "bar"}, {"foo": "not-bar"}}, chiselRules(), hammerRules(), sawRules()), + }, + { + name: "select properly remove duplicates", + startingClusterRoles: []*rbacv1.ClusterRole{ + role("hammer", map[string]string{"foo": "bar"}, hammerRules()), + role("chisel", map[string]string{"foo": "bar"}, chiselRules()), + role("saw", map[string]string{"foo": "bar"}, sawRules()), + role("other-saw", map[string]string{"foo": "not-bar"}, sawRules()), + combinedRole([]map[string]string{{"foo": "bar"}, {"foo": "not-bar"}}), + }, + clusterRoleToSync: "combined", + expectedClusterRole: combinedRole([]map[string]string{{"foo": "bar"}, {"foo": "not-bar"}}, chiselRules(), hammerRules(), sawRules()), + }, + { + name: "no diff skip", + startingClusterRoles: []*rbacv1.ClusterRole{ + role("hammer", map[string]string{"foo": "bar"}, hammerRules()), + combinedRole([]map[string]string{{"foo": "bar"}}, hammerRules()), + }, + clusterRoleToSync: "combined", + expectedClusterRole: nil, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + indexer := cache.NewIndexer(controller.KeyFunc, cache.Indexers{}) + objs := []runtime.Object{} + for _, obj := range test.startingClusterRoles { + objs = append(objs, obj) + indexer.Add(obj) + } + fakeClient := fakeclient.NewSimpleClientset(objs...) + c := ClusterRoleAggregationController{ + clusterRoleClient: fakeClient.RbacV1(), + clusterRoleLister: rbaclisters.NewClusterRoleLister(indexer), + } + err := c.syncClusterRole(test.clusterRoleToSync) + if err != nil { + t.Fatal(err) + } + + if test.expectedClusterRole == nil { + if len(fakeClient.Actions()) != 0 { + t.Fatalf("unexpected actions %#v", fakeClient.Actions()) + } + return + } + if len(fakeClient.Actions()) != 1 { + t.Fatalf("unexpected actions %#v", fakeClient.Actions()) + } + + action := fakeClient.Actions()[0] + if !action.Matches("update", "clusterroles") { + t.Fatalf("unexpected action %#v", action) + } + updateAction, ok := action.(clienttesting.UpdateAction) + if !ok { + t.Fatalf("unexpected action %#v", action) + } + if !equality.Semantic.DeepEqual(updateAction.GetObject().(*rbacv1.ClusterRole), test.expectedClusterRole) { + t.Fatalf("%v", diff.ObjectDiff(test.expectedClusterRole, updateAction.GetObject().(*rbacv1.ClusterRole))) + + } + }) + } +}