diff --git a/pkg/master/thirdparty/BUILD b/pkg/master/thirdparty/BUILD index fbcc4ce05d2..44375722307 100644 --- a/pkg/master/thirdparty/BUILD +++ b/pkg/master/thirdparty/BUILD @@ -5,23 +5,32 @@ licenses(["notice"]) load( "@io_bazel_rules_go//go:def.bzl", "go_library", + "go_test", ) go_library( name = "go_default_library", - srcs = ["thirdparty.go"], + srcs = [ + "thirdparty.go", + "tprregistration_controller.go", + ], tags = ["automanaged"], deps = [ "//pkg/api:go_default_library", "//pkg/apis/extensions:go_default_library", + "//pkg/client/informers/informers_generated/internalversion/extensions/internalversion:go_default_library", + "//pkg/client/listers/extensions/internalversion:go_default_library", "//pkg/registry/extensions/rest:go_default_library", "//pkg/registry/extensions/thirdpartyresourcedata:go_default_library", "//pkg/registry/extensions/thirdpartyresourcedata/storage:go_default_library", "//vendor:github.com/golang/glog", "//vendor:k8s.io/apimachinery/pkg/api/meta", "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/labels", "//vendor:k8s.io/apimachinery/pkg/runtime", "//vendor:k8s.io/apimachinery/pkg/runtime/schema", + "//vendor:k8s.io/apimachinery/pkg/util/runtime", + "//vendor:k8s.io/apimachinery/pkg/util/wait", "//vendor:k8s.io/apiserver/pkg/endpoints", "//vendor:k8s.io/apiserver/pkg/endpoints/handlers", "//vendor:k8s.io/apiserver/pkg/endpoints/request", @@ -30,6 +39,9 @@ go_library( "//vendor:k8s.io/apiserver/pkg/server", "//vendor:k8s.io/apiserver/pkg/server/storage", "//vendor:k8s.io/apiserver/pkg/storage/storagebackend", + "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/util/workqueue", + "//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration", ], ) @@ -45,3 +57,19 @@ filegroup( srcs = [":package-srcs"], tags = ["automanaged"], ) + +go_test( + name = "go_default_test", + srcs = ["tprregistration_controller_test.go"], + library = ":go_default_library", + tags = ["automanaged"], + deps = [ + "//pkg/apis/extensions:go_default_library", + "//pkg/client/listers/extensions/internalversion:go_default_library", + "//vendor:k8s.io/apimachinery/pkg/apis/meta/v1", + "//vendor:k8s.io/apimachinery/pkg/runtime/schema", + "//vendor:k8s.io/client-go/tools/cache", + "//vendor:k8s.io/client-go/util/workqueue", + "//vendor:k8s.io/kube-aggregator/pkg/apis/apiregistration", + ], +) diff --git a/pkg/master/thirdparty/tprregistration_controller.go b/pkg/master/thirdparty/tprregistration_controller.go new file mode 100644 index 00000000000..f9b5afb9fae --- /dev/null +++ b/pkg/master/thirdparty/tprregistration_controller.go @@ -0,0 +1,214 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package thirdparty + +import ( + "fmt" + "time" + + "github.com/golang/glog" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kube-aggregator/pkg/apis/apiregistration" + "k8s.io/kubernetes/pkg/apis/extensions" + informers "k8s.io/kubernetes/pkg/client/informers/informers_generated/internalversion/extensions/internalversion" + listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion" + "k8s.io/kubernetes/pkg/registry/extensions/thirdpartyresourcedata" +) + +// AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for +// adding and removing APIServices +type AutoAPIServiceRegistration interface { + // AddAPIServiceToSync adds an API service to auto-register. + AddAPIServiceToSync(in *apiregistration.APIService) + // RemoveAPIServiceToSync removes an API service to auto-register. + RemoveAPIServiceToSync(name string) +} + +type tprRegistrationController struct { + tprLister listers.ThirdPartyResourceLister + tprSynced cache.InformerSynced + + apiServiceRegistration AutoAPIServiceRegistration + + syncHandler func(groupVersion schema.GroupVersion) error + + // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors + // this is actually keyed by a groupVersion + queue workqueue.RateLimitingInterface +} + +// NewAutoRegistrationController returns a controller which will register TPR GroupVersions with the auto APIService registration +// controller so they automatically stay in sync. +func NewAutoRegistrationController(tprInformer informers.ThirdPartyResourceInformer, apiServiceRegistration AutoAPIServiceRegistration) *tprRegistrationController { + c := &tprRegistrationController{ + tprLister: tprInformer.Lister(), + tprSynced: tprInformer.Informer().HasSynced, + apiServiceRegistration: apiServiceRegistration, + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"), + } + c.syncHandler = c.handleTPR + + tprInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { + cast := obj.(*extensions.ThirdPartyResource) + c.enqueueTPR(cast) + }, + UpdateFunc: func(_, obj interface{}) { + cast := obj.(*extensions.ThirdPartyResource) + c.enqueueTPR(cast) + }, + DeleteFunc: func(obj interface{}) { + cast, ok := obj.(*extensions.ThirdPartyResource) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + glog.V(2).Infof("Couldn't get object from tombstone %#v", obj) + return + } + cast, ok = tombstone.Obj.(*extensions.ThirdPartyResource) + if !ok { + glog.V(2).Infof("Tombstone contained unexpected object: %#v", obj) + return + } + } + c.enqueueTPR(cast) + }, + }) + + return c +} + +func (c *tprRegistrationController) Run(threadiness int, stopCh chan struct{}) { + // don't let panics crash the process + defer utilruntime.HandleCrash() + // make sure the work queue is shutdown which will trigger workers to end + defer c.queue.ShutDown() + + glog.Infof("Starting tpr-autoregister controller") + defer glog.Infof("Shutting down tpr-autoregister controller") + + // wait for your secondary caches to fill before starting your work + if !cache.WaitForCacheSync(stopCh, c.tprSynced) { + return + } + + // start up your worker threads based on threadiness. Some controllers have multiple kinds of workers + for i := 0; i < threadiness; i++ { + // runWorker will loop until "something bad" happens. The .Until will then rekick the worker + // after one second + go wait.Until(c.runWorker, time.Second, stopCh) + } + + // wait until we're told to stop + <-stopCh +} + +func (c *tprRegistrationController) runWorker() { + // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work + // available, so we don't worry about secondary waits + for c.processNextWorkItem() { + } +} + +// processNextWorkItem deals with one key off the queue. It returns false when it's time to quit. +func (c *tprRegistrationController) processNextWorkItem() bool { + // pull the next work item from queue. It should be a key we use to lookup something in a cache + key, quit := c.queue.Get() + if quit { + return false + } + // you always have to indicate to the queue that you've completed a piece of work + defer c.queue.Done(key) + + // do your work on the key. This method will contains your "do stuff" logic + err := c.syncHandler(key.(schema.GroupVersion)) + if err == nil { + // if you had no error, tell the queue to stop tracking history for your key. This will + // reset things like failure counts for per-item rate limiting + c.queue.Forget(key) + return true + } + + // there was a failure so be sure to report it. This method allows for pluggable error handling + // which can be used for things like cluster-monitoring + utilruntime.HandleError(fmt.Errorf("%v failed with : %v", key, err)) + // since we failed, we should requeue the item to work on later. This method will add a backoff + // to avoid hotlooping on particular items (they're probably still not going to work right away) + // and overall controller protection (everything I've done is broken, this controller needs to + // calm down or it can starve other useful work) cases. + c.queue.AddRateLimited(key) + + return true +} + +func (c *tprRegistrationController) enqueueTPR(tpr *extensions.ThirdPartyResource) { + _, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr) + if err != nil { + utilruntime.HandleError(err) + return + } + for _, version := range tpr.Versions { + c.queue.Add(schema.GroupVersion{Group: group, Version: version.Name}) + } +} + +func (c *tprRegistrationController) handleTPR(groupVersion schema.GroupVersion) error { + // check all TPRs. There shouldn't that many, but if we have problems later we can index them + tprs, err := c.tprLister.List(labels.Everything()) + if err != nil { + return err + } + + found := false + for _, tpr := range tprs { + _, group, err := thirdpartyresourcedata.ExtractApiGroupAndKind(tpr) + if err != nil { + return err + } + for _, version := range tpr.Versions { + if version.Name == groupVersion.Version && group == groupVersion.Group { + found = true + break + } + } + } + + apiServiceName := groupVersion.Version + "." + groupVersion.Group + + if !found { + c.apiServiceRegistration.RemoveAPIServiceToSync(apiServiceName) + return nil + } + + c.apiServiceRegistration.AddAPIServiceToSync(&apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: apiServiceName}, + Spec: apiregistration.APIServiceSpec{ + Group: groupVersion.Group, + Version: groupVersion.Version, + Priority: 500, // TPRs should have relatively low priority + }, + }) + + return nil +} diff --git a/pkg/master/thirdparty/tprregistration_controller_test.go b/pkg/master/thirdparty/tprregistration_controller_test.go new file mode 100644 index 00000000000..1782a78177e --- /dev/null +++ b/pkg/master/thirdparty/tprregistration_controller_test.go @@ -0,0 +1,141 @@ +/* +Copyright 2017 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package thirdparty + +import ( + "reflect" + "testing" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kube-aggregator/pkg/apis/apiregistration" + "k8s.io/kubernetes/pkg/apis/extensions" + listers "k8s.io/kubernetes/pkg/client/listers/extensions/internalversion" +) + +func TestEnqueue(t *testing.T) { + c := tprRegistrationController{ + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "tpr-autoregister"), + } + + tpr := &extensions.ThirdPartyResource{ + ObjectMeta: metav1.ObjectMeta{Name: "resource.group.example.com"}, + Versions: []extensions.APIVersion{ + {Name: "v1alpha1"}, + {Name: "v1"}, + }, + } + c.enqueueTPR(tpr) + + first, _ := c.queue.Get() + expectedFirst := schema.GroupVersion{Group: "group.example.com", Version: "v1alpha1"} + if first != expectedFirst { + t.Errorf("expected %v, got %v", expectedFirst, first) + } + + second, _ := c.queue.Get() + expectedSecond := schema.GroupVersion{Group: "group.example.com", Version: "v1"} + if second != expectedSecond { + t.Errorf("expected %v, got %v", expectedSecond, second) + } +} + +func TestHandleTPR(t *testing.T) { + tests := []struct { + name string + startingTPRs []*extensions.ThirdPartyResource + version schema.GroupVersion + + expectedAdded []*apiregistration.APIService + expectedRemoved []string + }{ + { + name: "simple add", + startingTPRs: []*extensions.ThirdPartyResource{ + { + ObjectMeta: metav1.ObjectMeta{Name: "resource.group.com"}, + Versions: []extensions.APIVersion{ + {Name: "v1"}, + }, + }, + }, + version: schema.GroupVersion{Group: "group.com", Version: "v1"}, + + expectedAdded: []*apiregistration.APIService{ + { + ObjectMeta: metav1.ObjectMeta{Name: "v1.group.com"}, + Spec: apiregistration.APIServiceSpec{ + Group: "group.com", + Version: "v1", + Priority: 500, + }, + }, + }, + }, + { + name: "simple remove", + startingTPRs: []*extensions.ThirdPartyResource{ + { + ObjectMeta: metav1.ObjectMeta{Name: "resource.group.com"}, + Versions: []extensions.APIVersion{ + {Name: "v1"}, + }, + }, + }, + version: schema.GroupVersion{Group: "group.com", Version: "v2"}, + + expectedRemoved: []string{"v2.group.com"}, + }, + } + + for _, test := range tests { + registration := &fakeAPIServiceRegistration{} + tprCache := cache.NewIndexer(cache.DeletionHandlingMetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) + tprLister := listers.NewThirdPartyResourceLister(tprCache) + c := tprRegistrationController{ + tprLister: tprLister, + apiServiceRegistration: registration, + } + for i := range test.startingTPRs { + tprCache.Add(test.startingTPRs[i]) + } + + c.handleTPR(test.version) + + if !reflect.DeepEqual(test.expectedAdded, registration.added) { + t.Errorf("%s expected %v, got %v", test.name, test.expectedAdded, registration.added) + } + if !reflect.DeepEqual(test.expectedRemoved, registration.removed) { + t.Errorf("%s expected %v, got %v", test.name, test.expectedRemoved, registration.removed) + } + } + +} + +type fakeAPIServiceRegistration struct { + added []*apiregistration.APIService + removed []string +} + +func (a *fakeAPIServiceRegistration) AddAPIServiceToSync(in *apiregistration.APIService) { + a.added = append(a.added, in) +} +func (a *fakeAPIServiceRegistration) RemoveAPIServiceToSync(name string) { + a.removed = append(a.removed, name) +}