diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go new file mode 100644 index 00000000000..d6167fae2dc --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go @@ -0,0 +1,249 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package autoregister + +import ( + "fmt" + "reflect" + "sync" + "time" + + "github.com/golang/glog" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/conversion" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + + "k8s.io/kube-aggregator/pkg/apis/apiregistration" + apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion" + informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion" + listers "k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion" +) + +const ( + AutoRegisterManagedLabel = "kube-aggregator.kubernetes.io/automanaged" +) + +var ( + cloner = conversion.NewCloner() +) + +// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for +// adding and removing APIServices +type AutoAPIServiceRegistration interface { + // AddAPIServiceToSync adds an API service to auto-register. + AddAPIServiceToSync(in *apiregistration.APIService) + // RemoveAPIServiceToSync removes an API service to auto-register. + RemoveAPIServiceToSync(name string) +} + +// autoRegisterController is used to keep a particular set of APIServices present in the API. It is useful +// for cases where you want to auto-register APIs like TPRs or groups from the core kube-apiserver +type autoRegisterController struct { + apiServiceLister listers.APIServiceLister + apiServiceSynced cache.InformerSynced + apiServiceClient apiregistrationclient.APIServicesGetter + + apiServicesToSyncLock sync.RWMutex + apiServicesToSync map[string]*apiregistration.APIService + + syncHandler func(apiServiceName string) error + + // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors + queue workqueue.RateLimitingInterface +} + +func NewAutoRegisterController(apiServiceInformer informers.APIServiceInformer, apiServiceClient apiregistrationclient.APIServicesGetter) *autoRegisterController { + c := &autoRegisterController{ + apiServiceLister: apiServiceInformer.Lister(), + apiServiceSynced: apiServiceInformer.Informer().HasSynced, + apiServiceClient: apiServiceClient, + apiServicesToSync: map[string]*apiregistration.APIService{}, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"), + } + c.syncHandler = c.checkAPIService + + apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cast := obj.(*apiregistration.APIService) + c.queue.Add(cast.Name) + }, + UpdateFunc: func(_, obj interface{}) { + cast := obj.(*apiregistration.APIService) + c.queue.Add(cast.Name) + }, + DeleteFunc: func(obj interface{}) { + cast, ok := obj.(*apiregistration.APIService) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.V(2).Infof("Couldn't get object from tombstone %#v", obj) + return + } + cast, ok = tombstone.Obj.(*apiregistration.APIService) + if !ok { + glog.V(2).Infof("Tombstone contained unexpected object: %#v", obj) + return + } + } + c.queue.Add(cast.Name) + }, + }) + + return c +} + +func (c *autoRegisterController) Run(threadiness int, stopCh chan struct{}) { + // don't let panics crash the process + defer utilruntime.HandleCrash() + // make sure the work queue is shutdown which will trigger workers to end + defer c.queue.ShutDown() + + glog.Infof("Starting autoregister controller") + defer glog.Infof("Shutting down autoregister controller") + + // wait for your secondary caches to fill before starting your work + if !cache.WaitForCacheSync(stopCh, c.apiServiceSynced) { + return + } + + // start up your worker threads based on threadiness. Some controllers have multiple kinds of workers + for i := 0; i < threadiness; i++ { + // runWorker will loop until "something bad" happens. The .Until will then rekick the worker + // after one second + go wait.Until(c.runWorker, time.Second, stopCh) + } + + // wait until we're told to stop + <-stopCh +} + +func (c *autoRegisterController) runWorker() { + // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work + // available, so we don't worry about secondary waits + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *autoRegisterController) processNextWorkItem() bool { + // pull the next work item from queue. It should be a key we use to lookup something in a cache + key, quit := c.queue.Get() + if quit { + return false + } + // you always have to indicate to the queue that you've completed a piece of work + defer c.queue.Done(key) + + // do your work on the key. This method will contains your "do stuff" logic + err := c.syncHandler(key.(string)) + if err == nil { + // if you had no error, tell the queue to stop tracking history for your key. This will + // reset things like failure counts for per-item rate limiting + c.queue.Forget(key) + return true + } + + // there was a failure so be sure to report it. This method allows for pluggable error handling + // which can be used for things like cluster-monitoring + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + // since we failed, we should requeue the item to work on later. This method will add a backoff + // to avoid hotlooping on particular items (they're probably still not going to work right away) + // and overall controller protection (everything I've done is broken, this controller needs to + // calm down or it can starve other useful work) cases. + c.queue.AddRateLimited(key) + + return true +} + +func (c *autoRegisterController) checkAPIService(name string) error { + desired := c.GetAPIServiceToSync(name) + curr, err := c.apiServiceLister.Get(name) + + switch { + // we had a real error, just return it + case err != nil && !apierrors.IsNotFound(err): + return err + + // we don't have an entry and we don't want one + case apierrors.IsNotFound(err) && desired == nil: + return nil + + // we don't have an entry and we do want one + case apierrors.IsNotFound(err) && desired != nil: + _, err := c.apiServiceClient.APIServices().Create(desired) + return err + + // we aren't trying to manage this APIService. If the user removes the label, he's taken over management himself + case curr.Labels[AutoRegisterManagedLabel] != "true": + return nil + + // we have a spurious APIService that we're managing, delete it + case desired == nil: + return c.apiServiceClient.APIServices().Delete(curr.Name, nil) + + // if the specs already match, nothing for us to do + case reflect.DeepEqual(curr.Spec, desired.Spec): + return nil + } + + // we have an entry and we have a desired, now we deconflict. Only a few fields matter. + apiService := &apiregistration.APIService{} + if err := apiregistration.DeepCopy_apiregistration_APIService(curr, apiService, cloner); err != nil { + return err + } + apiService.Spec = desired.Spec + _, err = c.apiServiceClient.APIServices().Update(apiService) + return err +} + +func (c *autoRegisterController) GetAPIServiceToSync(name string) *apiregistration.APIService { + c.apiServicesToSyncLock.RLock() + defer c.apiServicesToSyncLock.RUnlock() + + return c.apiServicesToSync[name] +} + +func (c *autoRegisterController) AddAPIServiceToSync(in *apiregistration.APIService) { + c.apiServicesToSyncLock.Lock() + defer c.apiServicesToSyncLock.Unlock() + + apiService := &apiregistration.APIService{} + if err := apiregistration.DeepCopy_apiregistration_APIService(in, apiService, cloner); err != nil { + // this shouldn't happen + utilruntime.HandleError(err) + return + } + if apiService.Labels == nil { + apiService.Labels = map[string]string{} + } + apiService.Labels[AutoRegisterManagedLabel] = "true" + + c.apiServicesToSync[apiService.Name] = apiService + c.queue.Add(apiService.Name) +} + +func (c *autoRegisterController) RemoveAPIServiceToSync(name string) { + c.apiServicesToSyncLock.Lock() + defer c.apiServicesToSyncLock.Unlock() + + delete(c.apiServicesToSync, name) + c.queue.Add(name) +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go new file mode 100644 index 00000000000..f34e0b530a5 --- /dev/null +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go @@ -0,0 +1,343 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package autoregister + +import ( + "fmt" + "reflect" + "testing" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apimachinery/pkg/watch" + core "k8s.io/client-go/testing" + + "k8s.io/kube-aggregator/pkg/apis/apiregistration" + "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset" + "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake" + informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion" +) + +func alwaysReady() bool { return true } + +func waitForNothing(startTime time.Time, client *fake.Clientset) (bool, error) { + if len(client.Actions()) > 0 { + return false, fmt.Errorf("unexpected action: %v", client.Actions()) + } + if time.Now().After(startTime.Add(3 * time.Second)) { + return true, nil + } + return false, nil +} + +func waitForCreate(name string) func(startTime time.Time, client *fake.Clientset) (bool, error) { + return func(startTime time.Time, client *fake.Clientset) (bool, error) { + if len(client.Actions()) == 0 { + return false, nil + } + if len(client.Actions()) > 1 { + return false, fmt.Errorf("unexpected action: %v", client.Actions()) + } + + action := client.Actions()[0] + createAction, ok := action.(core.CreateAction) + if !ok { + return false, fmt.Errorf("unexpected action: %v", client.Actions()) + } + apiService := createAction.GetObject().(*apiregistration.APIService) + if apiService.Name != name || apiService.Labels[AutoRegisterManagedLabel] != "true" { + return false, fmt.Errorf("bad name or label %v", createAction) + } + + return true, nil + } +} + +func waitForUpdate(name string) func(startTime time.Time, client *fake.Clientset) (bool, error) { + return func(startTime time.Time, client *fake.Clientset) (bool, error) { + if len(client.Actions()) == 0 { + return false, nil + } + if len(client.Actions()) > 1 { + return false, fmt.Errorf("unexpected action: %v", client.Actions()) + } + + action := client.Actions()[0] + updateAction, ok := action.(core.UpdateAction) + if !ok { + return false, fmt.Errorf("unexpected action: %v", client.Actions()) + } + apiService := updateAction.GetObject().(*apiregistration.APIService) + if apiService.Name != name || apiService.Labels[AutoRegisterManagedLabel] != "true" || apiService.Spec.Group != "" { + return false, fmt.Errorf("bad name, label, or group %v", updateAction) + } + + return true, nil + } +} + +func waitForDelete(name string) func(startTime time.Time, client *fake.Clientset) (bool, error) { + return func(startTime time.Time, client *fake.Clientset) (bool, error) { + if len(client.Actions()) == 0 { + return false, nil + } + + // tolerate delete being called multiple times. This happens if the delete fails on missing resource which + // happens on an unsynced cache + for _, action := range client.Actions() { + deleteAction, ok := action.(core.DeleteAction) + if !ok { + return false, fmt.Errorf("unexpected action: %v", client.Actions()) + } + if deleteAction.GetName() != name { + return false, fmt.Errorf("bad name %v", deleteAction) + } + } + + return true, nil + } +} + +func TestCheckAPIService(t *testing.T) { + tests := []struct { + name string + + steps []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) + expectedResults []func(startTime time.Time, client *fake.Clientset) (bool, error) + }{ + { + name: "do nothing", + steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){ + // adding an API service which isn't auto-managed does nothing + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + fakeWatch.Add(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }, + // removing an auto-sync that doesn't exist should do nothing + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + c.RemoveAPIServiceToSync("bar") + }, + }, + expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){ + waitForNothing, + waitForNothing, + }, + }, + { + name: "simple create and delete", + steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){ + // adding one to auto-register should create + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }, + // adding the same item again shouldn't do anything + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }, + // removing entry should delete the API service since its managed + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + c.RemoveAPIServiceToSync("foo") + }, + }, + expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){ + waitForCreate("foo"), + waitForNothing, + waitForDelete("foo"), + }, + }, + { + name: "create, user manage, then delete", + steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){ + // adding one to auto-register should create + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }, + // adding an API service to take ownership shouldn't cause the controller to do anything + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + fakeWatch.Modify(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }, + // removing entry should NOT delete the API service since its user owned + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + c.RemoveAPIServiceToSync("foo") + }, + }, + expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){ + waitForCreate("foo"), + waitForNothing, + waitForNothing, + }, + }, + { + name: "create managed apiservice without a matching request", + steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){ + // adding an API service which isn't auto-managed does nothing + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + fakeWatch.Add(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }, + // adding an API service which claims to be managed but isn't should be deleted + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + fakeWatch.Modify(&apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Labels: map[string]string{AutoRegisterManagedLabel: "true"}, + }}) + }, + }, + expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){ + waitForNothing, + waitForDelete("foo"), + }, + }, + { + name: "modifying it should result in stomping", + steps: []func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface){ + // adding one to auto-register should create + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + c.AddAPIServiceToSync(&apiregistration.APIService{ObjectMeta: metav1.ObjectMeta{Name: "foo"}}) + }, + // updating a managed APIService should result in stomping it + func(c AutoAPIServiceRegistration, fakeWatch *watch.FakeWatcher, client internalclientset.Interface) { + fakeWatch.Modify(&apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{ + Name: "foo", + Labels: map[string]string{AutoRegisterManagedLabel: "true"}, + }, + Spec: apiregistration.APIServiceSpec{ + Group: "something", + }, + }) + }, + }, + expectedResults: []func(startTime time.Time, client *fake.Clientset) (bool, error){ + waitForCreate("foo"), + waitForUpdate("foo"), + }, + }, + } + +NextTest: + for _, test := range tests { + client := fake.NewSimpleClientset() + informerFactory := informers.NewSharedInformerFactory(client, 0) + fakeWatch := watch.NewFake() + client.PrependWatchReactor("apiservices", core.DefaultWatchReactor(fakeWatch, nil)) + + c := NewAutoRegisterController(informerFactory.Apiregistration().InternalVersion().APIServices(), client.Apiregistration()) + + stopCh := make(chan struct{}) + go informerFactory.Start(stopCh) + go c.Run(3, stopCh) + + // wait for the initial sync to complete + err := wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + return c.apiServiceSynced(), nil + }) + if err != nil { + t.Errorf("%s %v", test.name, err) + close(stopCh) + continue NextTest + } + + for i, step := range test.steps { + client.ClearActions() + step(c, fakeWatch, client) + + startTime := time.Now() + err := wait.PollImmediate(10*time.Millisecond, 20*time.Second, func() (bool, error) { + return test.expectedResults[i](startTime, client) + }) + if err != nil { + t.Errorf("%s[%d] %v", test.name, i, err) + close(stopCh) + continue NextTest + } + + // make sure that any create/update/delete is propagated to the watch + for _, a := range client.Actions() { + switch action := a.(type) { + case core.CreateAction: + fakeWatch.Add(action.GetObject()) + metadata, err := meta.Accessor(action.GetObject()) + if err != nil { + t.Fatal(err) + } + err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + if _, err := c.apiServiceLister.Get(metadata.GetName()); err == nil { + return true, nil + } + return false, nil + }) + if err != nil { + t.Errorf("%s[%d] %v", test.name, i, err) + close(stopCh) + continue NextTest + } + + case core.DeleteAction: + obj, err := c.apiServiceLister.Get(action.GetName()) + if apierrors.IsNotFound(err) { + close(stopCh) + continue NextTest + } + if err != nil { + t.Fatal(err) + } + fakeWatch.Delete(obj) + err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + if _, err := c.apiServiceLister.Get(action.GetName()); apierrors.IsNotFound(err) { + return true, nil + } + return false, nil + }) + if err != nil { + t.Errorf("%s[%d] %v", test.name, i, err) + close(stopCh) + continue NextTest + } + + case core.UpdateAction: + fakeWatch.Modify(action.GetObject()) + metadata, err := meta.Accessor(action.GetObject()) + if err != nil { + t.Fatal(err) + } + err = wait.PollImmediate(10*time.Millisecond, 10*time.Second, func() (bool, error) { + obj, err := c.apiServiceLister.Get(metadata.GetName()) + if err != nil { + return false, err + } + if reflect.DeepEqual(obj, action.GetObject()) { + return true, nil + } + + return false, nil + }) + if err != nil { + t.Errorf("%s[%d] %v", test.name, i, err) + close(stopCh) + continue NextTest + } + + } + } + } + + close(stopCh) + } +} diff --git a/vendor/BUILD b/vendor/BUILD index 811973ee8ff..6a4198eb6de 100644 --- a/vendor/BUILD +++ b/vendor/BUILD @@ -16312,3 +16312,41 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "k8s.io/kube-aggregator/pkg/controllers/autoregister_test", + srcs = ["k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go"], + library = ":k8s.io/kube-aggregator/pkg/controllers/autoregister", + tags = ["automanaged"], + deps = [ + "//vendor:k8s.io/apimachinery/pkg/api/errors", + "//vendor:k8s.io/apimachinery/pkg/api/meta", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/apimachinery/pkg/watch", + "//vendor:k8s.io/client-go/testing", + "//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration", + "//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset", + "//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/fake", + "//vendor:k8s.io/kube-aggregator/pkg/client/informers/internalversion", + ], +) + +go_library( + name = "k8s.io/kube-aggregator/pkg/controllers/autoregister", + srcs = ["k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go"], + tags = ["automanaged"], + deps = [ + "//vendor:github.com/golang/glog", + "//vendor:k8s.io/apimachinery/pkg/api/errors", + "//vendor:k8s.io/apimachinery/pkg/conversion", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/wait", + "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/util/workqueue", + "//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration", + "//vendor:k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion", + "//vendor:k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion", + "//vendor:k8s.io/kube-aggregator/pkg/client/listers/apiregistration/internalversion", + ], +)