From 9a8b111c9c37cec00aacbee3336b198dd915c132 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Tue, 29 Aug 2017 21:08:07 -0400 Subject: [PATCH 1/4] Limit APIService healthz check to startup --- cmd/kube-apiserver/app/BUILD | 3 +- cmd/kube-apiserver/app/aggregator.go | 79 ++++++++++++++++++---------- 2 files changed, 52 insertions(+), 30 deletions(-) diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index 563922ce5cb..d816bb0a758 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -50,7 +50,6 @@ go_library( "//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -68,10 +67,12 @@ go_library( "//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apiserver:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library", + "//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/controllers/autoregister:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/common:go_default_library", ], diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 2be47d681da..110dfd86b24 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -24,21 +24,24 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "github.com/golang/glog" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" genericoptions "k8s.io/apiserver/pkg/server/options" kubeexternalinformers "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "k8s.io/kube-aggregator/pkg/apis/apiregistration" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion" + informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion" "k8s.io/kube-aggregator/pkg/controllers/autoregister" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/master/controller/crdregistration" @@ -106,35 +109,14 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega go crdRegistrationController.Run(5, context.StopCh) return nil }) - aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error { - items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything()) - if err != nil { - return err - } - missing := []apiregistration.APIService{} - for _, apiService := range apiServices { - found := false - for _, item := range items { - if item.Name != apiService.Name { - continue - } - if apiregistration.IsAPIServiceConditionTrue(item, apiregistration.Available) { - found = true - break - } - } - - if !found { - missing = append(missing, *apiService) - } - } - - if len(missing) > 0 { - return fmt.Errorf("missing APIService: %v", missing) - } - return nil - })) + aggregatorServer.GenericAPIServer.AddHealthzChecks( + makeAPIServiceAvailableHealthzCheck( + "autoregister-completion", + apiServices, + aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(), + ), + ) return aggregatorServer, nil } @@ -158,6 +140,45 @@ func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService { } } +// makeAPIServiceAvailableHealthzCheck returns a healthz check that returns healthy +// once all of the specified services have been observed to be available at least once. +func makeAPIServiceAvailableHealthzCheck(name string, apiServices []*apiregistration.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthzChecker { + // Track the auto-registered API services that have not been observed to be available yet + pendingServiceNamesLock := &sync.RWMutex{} + pendingServiceNames := sets.NewString() + for _, service := range apiServices { + pendingServiceNames.Insert(service.Name) + } + + // When an APIService in the list is seen as available, remove it from the pending list + handleAPIServiceChange := func(service *apiregistration.APIService) { + pendingServiceNamesLock.Lock() + defer pendingServiceNamesLock.Unlock() + if !pendingServiceNames.Has(service.Name) { + return + } + if apiregistration.IsAPIServiceConditionTrue(service, apiregistration.Available) { + pendingServiceNames.Delete(service.Name) + } + } + + // Watch add/update events for APIServices + apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*apiregistration.APIService)) }, + UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*apiregistration.APIService)) }, + }) + + // Don't return healthy until the pending list is empty + return healthz.NamedCheck(name, func(r *http.Request) error { + pendingServiceNamesLock.RLock() + defer pendingServiceNamesLock.RUnlock() + if pendingServiceNames.Len() > 0 { + return fmt.Errorf("missing APIService: %v", pendingServiceNames.List()) + } + return nil + }) +} + type priority struct { group int32 version int32 From d353adc4675deb767edaafe791ef3e6e5e561ec1 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Sat, 2 Sep 2017 12:57:58 -0400 Subject: [PATCH 2/4] Make local APIService objects available on create --- .../pkg/apis/apiregistration/helpers.go | 12 ++++++++++++ .../pkg/controllers/status/available_controller.go | 5 +---- .../pkg/registry/apiservice/strategy.go | 5 +++++ 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/helpers.go b/staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/helpers.go index 1f6790b32e3..5e36e7db26e 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/helpers.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/helpers.go @@ -20,6 +20,7 @@ import ( "sort" "strings" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" ) @@ -85,6 +86,17 @@ func APIServiceNameToGroupVersion(apiServiceName string) schema.GroupVersion { return schema.GroupVersion{Group: tokens[1], Version: tokens[0]} } +// NewLocalAvailableAPIServiceCondition returns a condition for an available local APIService. +func NewLocalAvailableAPIServiceCondition() APIServiceCondition { + return APIServiceCondition{ + Type: Available, + Status: ConditionTrue, + LastTransitionTime: metav1.Now(), + Reason: "Local", + Message: "Local APIServices are always available", + } +} + // SetAPIServiceCondition sets the status condition. It either overwrites the existing one or // creates a new one func SetAPIServiceCondition(apiService *APIService, newCondition APIServiceCondition) { diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go index 7516749e4af..df6a2b0f4de 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/status/available_controller.go @@ -120,10 +120,7 @@ func (c *AvailableConditionController) sync(key string) error { // local API services are always considered available if apiService.Spec.Service == nil { - availableCondition.Status = apiregistration.ConditionTrue - availableCondition.Reason = "Local" - availableCondition.Message = "Local APIServices are always available" - apiregistration.SetAPIServiceCondition(apiService, availableCondition) + apiregistration.SetAPIServiceCondition(apiService, apiregistration.NewLocalAvailableAPIServiceCondition()) _, err := c.apiServiceClient.APIServices().UpdateStatus(apiService) return err } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go index fdcb6637f76..51887d71937 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/registry/apiservice/strategy.go @@ -48,6 +48,11 @@ func (apiServerStrategy) NamespaceScoped() bool { func (apiServerStrategy) PrepareForCreate(ctx genericapirequest.Context, obj runtime.Object) { apiservice := obj.(*apiregistration.APIService) apiservice.Status = apiregistration.APIServiceStatus{} + + // mark local API services as immediately available on create + if apiservice.Spec.Service == nil { + apiregistration.SetAPIServiceCondition(apiservice, apiregistration.NewLocalAvailableAPIServiceCondition()) + } } func (apiServerStrategy) PrepareForUpdate(ctx genericapirequest.Context, obj, old runtime.Object) { From 0529dd405bad3849ab1e9a461f6fabae214849e0 Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Sat, 2 Sep 2017 13:21:26 -0400 Subject: [PATCH 3/4] Prevent flutter of CRD APIServices on start --- cmd/kube-apiserver/app/aggregator.go | 7 +++++- .../crdregistration_controller.go | 22 ++++++++++++++++++- 2 files changed, 27 insertions(+), 2 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 110dfd86b24..6641a0be699 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -105,8 +105,13 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega autoRegistrationController) aggregatorServer.GenericAPIServer.AddPostStartHook("kube-apiserver-autoregistration", func(context genericapiserver.PostStartHookContext) error { - go autoRegistrationController.Run(5, context.StopCh) go crdRegistrationController.Run(5, context.StopCh) + go func() { + // let the CRD controller process the initial set of CRDs before starting the autoregistration controller. + // this prevents the autoregistration controller's initial sync from deleting APIServices for CRDs that still exist. + crdRegistrationController.WaitForInitialSync() + autoRegistrationController.Run(5, context.StopCh) + }() return nil }) diff --git a/pkg/master/controller/crdregistration/crdregistration_controller.go b/pkg/master/controller/crdregistration/crdregistration_controller.go index 62b29114165..5323206f9d6 100644 --- a/pkg/master/controller/crdregistration/crdregistration_controller.go +++ b/pkg/master/controller/crdregistration/crdregistration_controller.go @@ -53,6 +53,8 @@ type crdRegistrationController struct { syncHandler func(groupVersion schema.GroupVersion) error + syncedInitialSet chan struct{} + // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors // this is actually keyed by a groupVersion queue workqueue.RateLimitingInterface @@ -67,7 +69,8 @@ func NewAutoRegistrationController(crdinformer crdinformers.CustomResourceDefini crdLister: crdinformer.Lister(), crdSynced: crdinformer.Informer().HasSynced, apiServiceRegistration: apiServiceRegistration, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd-autoregister"), + syncedInitialSet: make(chan struct{}), + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "crd-autoregister"), } c.syncHandler = c.handleVersionUpdate @@ -114,6 +117,18 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{}) return } + // process each item in the list once + if crds, err := c.crdLister.List(labels.Everything()); err != nil { + utilruntime.HandleError(err) + } else { + for _, crd := range crds { + if err := c.syncHandler(schema.GroupVersion{Group: crd.Spec.Group, Version: crd.Spec.Version}); err != nil { + utilruntime.HandleError(err) + } + } + } + close(c.syncedInitialSet) + // start up your worker threads based on threadiness. Some controllers have multiple kinds of workers for i := 0; i < threadiness; i++ { // runWorker will loop until "something bad" happens. The .Until will then rekick the worker @@ -125,6 +140,11 @@ func (c *crdRegistrationController) Run(threadiness int, stopCh <-chan struct{}) <-stopCh } +// WaitForInitialSync blocks until the initial set of CRD resources has been processed +func (c *crdRegistrationController) WaitForInitialSync() { + <-c.syncedInitialSet +} + func (c *crdRegistrationController) runWorker() { // hot loop until we're told to stop. processNextWorkItem will automatically wait until there's work // available, so we don't worry about secondary waits From 8ca6d9994e51e6b7efd53ee5239928b4cd9f24bf Mon Sep 17 00:00:00 2001 From: Jordan Liggitt Date: Tue, 29 Aug 2017 23:19:34 -0400 Subject: [PATCH 4/4] Sync local APIService objects once --- cmd/kube-apiserver/app/aggregator.go | 4 +- .../pkg/controllers/autoregister/BUILD | 1 + .../autoregister/autoregister_controller.go | 118 ++++++++++++-- .../autoregister_controller_test.go | 150 ++++++++++++++++-- 4 files changed, 251 insertions(+), 22 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 6641a0be699..80e6ad1d9d4 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -229,7 +229,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, for _, curr := range delegateAPIServer.ListedPaths() { if curr == "/api/v1" { apiService := makeAPIService(schema.GroupVersion{Group: "", Version: "v1"}) - registration.AddAPIServiceToSync(apiService) + registration.AddAPIServiceToSyncOnStart(apiService) apiServices = append(apiServices, apiService) continue } @@ -247,7 +247,7 @@ func apiServicesToRegister(delegateAPIServer genericapiserver.DelegationTarget, if apiService == nil { continue } - registration.AddAPIServiceToSync(apiService) + registration.AddAPIServiceToSyncOnStart(apiService) apiServices = append(apiServices, apiService) } diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/BUILD index 15e5ec21407..0688ca9a459 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/BUILD @@ -27,6 +27,7 @@ go_library( deps = [ "//vendor/github.com/golang/glog:go_default_library", "//vendor/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/runtime:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//vendor/k8s.io/client-go/tools/cache:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go index 4fe8aa6b46f..c6051136b12 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller.go @@ -25,6 +25,7 @@ import ( "github.com/golang/glog" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/labels" utilruntime "k8s.io/apimachinery/pkg/util/runtime" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" @@ -39,12 +40,19 @@ import ( const ( AutoRegisterManagedLabel = "kube-aggregator.kubernetes.io/automanaged" + + // manageOnStart is a value for the AutoRegisterManagedLabel that indicates the APIService wants to be synced one time when the controller starts. + manageOnStart = "onstart" + // manageContinuously is a value for the AutoRegisterManagedLabel that indicates the APIService wants to be synced continuously. + manageContinuously = "true" ) // AutoAPIServiceRegistration is an interface which callers can re-declare locally and properly cast to for // adding and removing APIServices type AutoAPIServiceRegistration interface { - // AddAPIServiceToSync adds an API service to auto-register. + // AddAPIServiceToSyncOnStart adds an API service to sync on start. + AddAPIServiceToSyncOnStart(in *apiregistration.APIService) + // AddAPIServiceToSync adds an API service to sync continuously. AddAPIServiceToSync(in *apiregistration.APIService) // RemoveAPIServiceToSync removes an API service to auto-register. RemoveAPIServiceToSync(name string) @@ -62,6 +70,13 @@ type autoRegisterController struct { syncHandler func(apiServiceName string) error + // track which services we have synced + syncedSuccessfullyLock *sync.RWMutex + syncedSuccessfully map[string]bool + + // remember names of services that existed when we started + apiServicesAtStart map[string]bool + // queue is where incoming work is placed to de-dup and to allow "easy" rate limited requeues on errors queue workqueue.RateLimitingInterface } @@ -72,7 +87,13 @@ func NewAutoRegisterController(apiServiceInformer informers.APIServiceInformer, apiServiceSynced: apiServiceInformer.Informer().HasSynced, apiServiceClient: apiServiceClient, apiServicesToSync: map[string]*apiregistration.APIService{}, - queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"), + + apiServicesAtStart: map[string]bool{}, + + syncedSuccessfullyLock: &sync.RWMutex{}, + syncedSuccessfully: map[string]bool{}, + + queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"), } c.syncHandler = c.checkAPIService @@ -120,6 +141,13 @@ func (c *autoRegisterController) Run(threadiness int, stopCh <-chan struct{}) { return } + // record APIService objects that existed when we started + if services, err := c.apiServiceLister.List(labels.Everything()); err == nil { + for _, service := range services { + c.apiServicesAtStart[service.Name] = true + } + } + // start up your worker threads based on threadiness. Some controllers have multiple kinds of workers for i := 0; i < threadiness; i++ { // runWorker will loop until "something bad" happens. The .Until will then rekick the worker @@ -169,29 +197,61 @@ func (c *autoRegisterController) processNextWorkItem() bool { return true } -func (c *autoRegisterController) checkAPIService(name string) error { +// checkAPIService syncs the current APIService against a list of desired APIService objects +// +// | A. desired: not found | B. desired: sync on start | C. desired: sync always +// ------------------------------------------------|-----------------------|---------------------------|------------------------ +// 1. current: lookup error | error | error | error +// 2. current: not found | - | create once | create +// 3. current: no sync | - | - | - +// 4. current: sync on start, not present at start | - | - | - +// 5. current: sync on start, present at start | delete once | update once | update once +// 6. current: sync always | delete | update once | update +func (c *autoRegisterController) checkAPIService(name string) (err error) { desired := c.GetAPIServiceToSync(name) curr, err := c.apiServiceLister.Get(name) + // if we've never synced this service successfully, record a successful sync. + hasSynced := c.hasSyncedSuccessfully(name) + if !hasSynced { + defer func() { + if err == nil { + c.setSyncedSuccessfully(name) + } + }() + } + switch { - // we had a real error, just return it + // we had a real error, just return it (1A,1B,1C) case err != nil && !apierrors.IsNotFound(err): return err - // we don't have an entry and we don't want one + // we don't have an entry and we don't want one (2A) case apierrors.IsNotFound(err) && desired == nil: return nil - // we don't have an entry and we do want one + // the local object only wants to sync on start and has already synced (2B,5B,6B "once" enforcement) + case isAutomanagedOnStart(desired) && hasSynced: + return nil + + // we don't have an entry and we do want one (2B,2C) case apierrors.IsNotFound(err) && desired != nil: _, err := c.apiServiceClient.APIServices().Create(desired) return err - // we aren't trying to manage this APIService. If the user removes the label, he's taken over management himself - case curr.Labels[AutoRegisterManagedLabel] != "true": + // we aren't trying to manage this APIService (3A,3B,3C) + case !isAutomanaged(curr): return nil - // we have a spurious APIService that we're managing, delete it + // the remote object only wants to sync on start, but was added after we started (4A,4B,4C) + case isAutomanagedOnStart(curr) && !c.apiServicesAtStart[name]: + return nil + + // the remote object only wants to sync on start and has already synced (5A,5B,5C "once" enforcement) + case isAutomanagedOnStart(curr) && hasSynced: + return nil + + // we have a spurious APIService that we're managing, delete it (5A,6A) case desired == nil: return c.apiServiceClient.APIServices().Delete(curr.Name, nil) @@ -200,7 +260,7 @@ func (c *autoRegisterController) checkAPIService(name string) error { return nil } - // we have an entry and we have a desired, now we deconflict. Only a few fields matter. + // we have an entry and we have a desired, now we deconflict. Only a few fields matter. (5B,5C,6B,6C) apiService := curr.DeepCopy() apiService.Spec = desired.Spec _, err = c.apiServiceClient.APIServices().Update(apiService) @@ -214,7 +274,15 @@ func (c *autoRegisterController) GetAPIServiceToSync(name string) *apiregistrati return c.apiServicesToSync[name] } +func (c *autoRegisterController) AddAPIServiceToSyncOnStart(in *apiregistration.APIService) { + c.addAPIServiceToSync(in, manageOnStart) +} + func (c *autoRegisterController) AddAPIServiceToSync(in *apiregistration.APIService) { + c.addAPIServiceToSync(in, manageContinuously) +} + +func (c *autoRegisterController) addAPIServiceToSync(in *apiregistration.APIService, syncType string) { c.apiServicesToSyncLock.Lock() defer c.apiServicesToSyncLock.Unlock() @@ -222,7 +290,7 @@ func (c *autoRegisterController) AddAPIServiceToSync(in *apiregistration.APIServ if apiService.Labels == nil { apiService.Labels = map[string]string{} } - apiService.Labels[AutoRegisterManagedLabel] = "true" + apiService.Labels[AutoRegisterManagedLabel] = syncType c.apiServicesToSync[apiService.Name] = apiService c.queue.Add(apiService.Name) @@ -235,3 +303,31 @@ func (c *autoRegisterController) RemoveAPIServiceToSync(name string) { delete(c.apiServicesToSync, name) c.queue.Add(name) } + +func (c *autoRegisterController) hasSyncedSuccessfully(name string) bool { + c.syncedSuccessfullyLock.RLock() + defer c.syncedSuccessfullyLock.RUnlock() + return c.syncedSuccessfully[name] +} + +func (c *autoRegisterController) setSyncedSuccessfully(name string) { + c.syncedSuccessfullyLock.Lock() + defer c.syncedSuccessfullyLock.Unlock() + c.syncedSuccessfully[name] = true +} + +func automanagedType(service *apiregistration.APIService) string { + if service == nil { + return "" + } + return service.Labels[AutoRegisterManagedLabel] +} + +func isAutomanagedOnStart(service *apiregistration.APIService) bool { + return automanagedType(service) == manageOnStart +} + +func isAutomanaged(service *apiregistration.APIService) bool { + managedType := automanagedType(service) + return managedType == manageOnStart || managedType == manageContinuously +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go index 9a6181f8361..0fd46981023 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/controllers/autoregister/autoregister_controller_test.go @@ -18,6 +18,7 @@ package autoregister import ( "fmt" + "sync" "testing" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -35,6 +36,12 @@ func newAutoRegisterManagedAPIService(name string) *apiregistration.APIService { } } +func newAutoRegisterManagedOnStartAPIService(name string) *apiregistration.APIService { + return &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{AutoRegisterManagedLabel: string("onstart")}}, + } +} + func newAutoRegisterManagedModifiedAPIService(name string) *apiregistration.APIService { return &apiregistration.APIService{ ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{AutoRegisterManagedLabel: string("true")}}, @@ -44,6 +51,15 @@ func newAutoRegisterManagedModifiedAPIService(name string) *apiregistration.APIS } } +func newAutoRegisterManagedOnStartModifiedAPIService(name string) *apiregistration.APIService { + return &apiregistration.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: name, Labels: map[string]string{AutoRegisterManagedLabel: string("onstart")}}, + Spec: apiregistration.APIServiceSpec{ + Group: "something", + }, + } +} + func newAPIService(name string) *apiregistration.APIService { return &apiregistration.APIService{ ObjectMeta: metav1.ObjectMeta{Name: name}, @@ -80,6 +96,28 @@ func checkForCreate(name string, client *fake.Clientset) error { return nil } +func checkForCreateOnStart(name string, client *fake.Clientset) error { + if len(client.Actions()) == 0 { + return nil + } + if len(client.Actions()) > 1 { + return fmt.Errorf("unexpected action: %v", client.Actions()) + } + + action := client.Actions()[0] + + createAction, ok := action.(clienttesting.CreateAction) + if !ok { + return fmt.Errorf("unexpected action: %v", client.Actions()) + } + apiService := createAction.GetObject().(*apiregistration.APIService) + if apiService.Name != name || apiService.Labels[AutoRegisterManagedLabel] != "onstart" { + return fmt.Errorf("bad name or label %v", createAction) + } + + return nil +} + func checkForUpdate(name string, client *fake.Clientset) error { if len(client.Actions()) == 0 { return nil @@ -121,13 +159,16 @@ func checkForDelete(name string, client *fake.Clientset) error { func TestSync(t *testing.T) { tests := []struct { - name string - apiServiceName string - addAPIServices []*apiregistration.APIService - updateAPIServices []*apiregistration.APIService - addSyncAPIServices []*apiregistration.APIService - delSyncAPIServices []string - expectedResults func(name string, client *fake.Clientset) error + name string + apiServiceName string + addAPIServices []*apiregistration.APIService + updateAPIServices []*apiregistration.APIService + addSyncAPIServices []*apiregistration.APIService + addSyncOnStartAPIServices []*apiregistration.APIService + delSyncAPIServices []string + alreadySynced map[string]bool + presentAtStart map[string]bool + expectedResults func(name string, client *fake.Clientset) error }{ { name: "adding an API service which isn't auto-managed does nothing", @@ -166,7 +207,7 @@ func TestSync(t *testing.T) { expectedResults: checkForDelete, }, { - name: "removing auto-manged then RemoveAPIService should not touch APIService", + name: "removing auto-managed then RemoveAPIService should not touch APIService", apiServiceName: "foo", addAPIServices: []*apiregistration.APIService{}, updateAPIServices: []*apiregistration.APIService{newAPIService("foo")}, @@ -192,17 +233,104 @@ func TestSync(t *testing.T) { delSyncAPIServices: []string{}, expectedResults: checkForUpdate, }, + + { + name: "adding one to auto-register on start should create", + apiServiceName: "foo", + addAPIServices: []*apiregistration.APIService{}, + updateAPIServices: []*apiregistration.APIService{}, + addSyncOnStartAPIServices: []*apiregistration.APIService{newAPIService("foo")}, + delSyncAPIServices: []string{}, + expectedResults: checkForCreateOnStart, + }, + { + name: "adding one to auto-register on start already synced should do nothing", + apiServiceName: "foo", + addAPIServices: []*apiregistration.APIService{}, + updateAPIServices: []*apiregistration.APIService{}, + addSyncOnStartAPIServices: []*apiregistration.APIService{newAPIService("foo")}, + delSyncAPIServices: []string{}, + alreadySynced: map[string]bool{"foo": true}, + expectedResults: checkForNothing, + }, + { + name: "managed onstart apiservice present at start without a matching request should delete", + apiServiceName: "foo", + addAPIServices: []*apiregistration.APIService{newAPIService("foo")}, + updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")}, + addSyncAPIServices: []*apiregistration.APIService{}, + delSyncAPIServices: []string{}, + presentAtStart: map[string]bool{"foo": true}, + alreadySynced: map[string]bool{}, + expectedResults: checkForDelete, + }, + { + name: "managed onstart apiservice present at start without a matching request already synced once should no-op", + apiServiceName: "foo", + addAPIServices: []*apiregistration.APIService{newAPIService("foo")}, + updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")}, + addSyncAPIServices: []*apiregistration.APIService{}, + delSyncAPIServices: []string{}, + presentAtStart: map[string]bool{"foo": true}, + alreadySynced: map[string]bool{"foo": true}, + expectedResults: checkForNothing, + }, + { + name: "managed onstart apiservice not present at start without a matching request should no-op", + apiServiceName: "foo", + addAPIServices: []*apiregistration.APIService{newAPIService("foo")}, + updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")}, + addSyncAPIServices: []*apiregistration.APIService{}, + delSyncAPIServices: []string{}, + presentAtStart: map[string]bool{}, + alreadySynced: map[string]bool{}, + expectedResults: checkForNothing, + }, + { + name: "modifying onstart it should result in stomping", + apiServiceName: "foo", + addAPIServices: []*apiregistration.APIService{}, + updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedModifiedAPIService("foo")}, + addSyncOnStartAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")}, + delSyncAPIServices: []string{}, + expectedResults: checkForUpdate, + }, + { + name: "modifying onstart already synced should no-op", + apiServiceName: "foo", + addAPIServices: []*apiregistration.APIService{}, + updateAPIServices: []*apiregistration.APIService{newAutoRegisterManagedModifiedAPIService("foo")}, + addSyncOnStartAPIServices: []*apiregistration.APIService{newAutoRegisterManagedOnStartAPIService("foo")}, + delSyncAPIServices: []string{}, + alreadySynced: map[string]bool{"foo": true}, + expectedResults: checkForNothing, + }, } for _, test := range tests { fakeClient := fake.NewSimpleClientset() apiServiceIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc}) - c := autoRegisterController{ + alreadySynced := map[string]bool{} + for k, v := range test.alreadySynced { + alreadySynced[k] = v + } + + presentAtStart := map[string]bool{} + for k, v := range test.presentAtStart { + presentAtStart[k] = v + } + + c := &autoRegisterController{ apiServiceClient: fakeClient.Apiregistration(), apiServiceLister: listers.NewAPIServiceLister(apiServiceIndexer), apiServicesToSync: map[string]*apiregistration.APIService{}, queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "autoregister"), + + syncedSuccessfullyLock: &sync.RWMutex{}, + syncedSuccessfully: alreadySynced, + + apiServicesAtStart: presentAtStart, } for _, obj := range test.addAPIServices { @@ -217,6 +345,10 @@ func TestSync(t *testing.T) { c.AddAPIServiceToSync(obj) } + for _, obj := range test.addSyncOnStartAPIServices { + c.AddAPIServiceToSyncOnStart(obj) + } + for _, objName := range test.delSyncAPIServices { c.RemoveAPIServiceToSync(objName) }