diff --git a/cmd/kube-apiserver/app/BUILD b/cmd/kube-apiserver/app/BUILD index 563922ce5cb..d816bb0a758 100644 --- a/cmd/kube-apiserver/app/BUILD +++ b/cmd/kube-apiserver/app/BUILD @@ -50,7 +50,6 @@ go_library( "//vendor/k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion:go_default_library", "//vendor/k8s.io/apiextensions-apiserver/pkg/cmd/server:go_default_library", "//vendor/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", - "//vendor/k8s.io/apimachinery/pkg/labels:go_default_library", "//vendor/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/errors:go_default_library", "//vendor/k8s.io/apimachinery/pkg/util/net:go_default_library", @@ -68,10 +67,12 @@ go_library( "//vendor/k8s.io/apiserver/pkg/storage/etcd3/preflight:go_default_library", "//vendor/k8s.io/client-go/informers:go_default_library", "//vendor/k8s.io/client-go/kubernetes:go_default_library", + "//vendor/k8s.io/client-go/tools/cache:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/apiserver:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion:go_default_library", + "//vendor/k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion:go_default_library", "//vendor/k8s.io/kube-aggregator/pkg/controllers/autoregister:go_default_library", "//vendor/k8s.io/kube-openapi/pkg/common:go_default_library", ], diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 2be47d681da..110dfd86b24 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -24,21 +24,24 @@ import ( "io/ioutil" "net/http" "strings" + "sync" "github.com/golang/glog" apiextensionsinformers "k8s.io/apiextensions-apiserver/pkg/client/informers/internalversion" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" - "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" genericoptions "k8s.io/apiserver/pkg/server/options" kubeexternalinformers "k8s.io/client-go/informers" + "k8s.io/client-go/tools/cache" "k8s.io/kube-aggregator/pkg/apis/apiregistration" "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" aggregatorapiserver "k8s.io/kube-aggregator/pkg/apiserver" apiregistrationclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/internalclientset/typed/apiregistration/internalversion" + informers "k8s.io/kube-aggregator/pkg/client/informers/internalversion/apiregistration/internalversion" "k8s.io/kube-aggregator/pkg/controllers/autoregister" "k8s.io/kubernetes/cmd/kube-apiserver/app/options" "k8s.io/kubernetes/pkg/master/controller/crdregistration" @@ -106,35 +109,14 @@ func createAggregatorServer(aggregatorConfig *aggregatorapiserver.Config, delega go crdRegistrationController.Run(5, context.StopCh) return nil }) - aggregatorServer.GenericAPIServer.AddHealthzChecks(healthz.NamedCheck("autoregister-completion", func(r *http.Request) error { - items, err := aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices().Lister().List(labels.Everything()) - if err != nil { - return err - } - missing := []apiregistration.APIService{} - for _, apiService := range apiServices { - found := false - for _, item := range items { - if item.Name != apiService.Name { - continue - } - if apiregistration.IsAPIServiceConditionTrue(item, apiregistration.Available) { - found = true - break - } - } - - if !found { - missing = append(missing, *apiService) - } - } - - if len(missing) > 0 { - return fmt.Errorf("missing APIService: %v", missing) - } - return nil - })) + aggregatorServer.GenericAPIServer.AddHealthzChecks( + makeAPIServiceAvailableHealthzCheck( + "autoregister-completion", + apiServices, + aggregatorServer.APIRegistrationInformers.Apiregistration().InternalVersion().APIServices(), + ), + ) return aggregatorServer, nil } @@ -158,6 +140,45 @@ func makeAPIService(gv schema.GroupVersion) *apiregistration.APIService { } } +// makeAPIServiceAvailableHealthzCheck returns a healthz check that returns healthy +// once all of the specified services have been observed to be available at least once. +func makeAPIServiceAvailableHealthzCheck(name string, apiServices []*apiregistration.APIService, apiServiceInformer informers.APIServiceInformer) healthz.HealthzChecker { + // Track the auto-registered API services that have not been observed to be available yet + pendingServiceNamesLock := &sync.RWMutex{} + pendingServiceNames := sets.NewString() + for _, service := range apiServices { + pendingServiceNames.Insert(service.Name) + } + + // When an APIService in the list is seen as available, remove it from the pending list + handleAPIServiceChange := func(service *apiregistration.APIService) { + pendingServiceNamesLock.Lock() + defer pendingServiceNamesLock.Unlock() + if !pendingServiceNames.Has(service.Name) { + return + } + if apiregistration.IsAPIServiceConditionTrue(service, apiregistration.Available) { + pendingServiceNames.Delete(service.Name) + } + } + + // Watch add/update events for APIServices + apiServiceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: func(obj interface{}) { handleAPIServiceChange(obj.(*apiregistration.APIService)) }, + UpdateFunc: func(old, new interface{}) { handleAPIServiceChange(new.(*apiregistration.APIService)) }, + }) + + // Don't return healthy until the pending list is empty + return healthz.NamedCheck(name, func(r *http.Request) error { + pendingServiceNamesLock.RLock() + defer pendingServiceNamesLock.RUnlock() + if pendingServiceNames.Len() > 0 { + return fmt.Errorf("missing APIService: %v", pendingServiceNames.List()) + } + return nil + }) +} + type priority struct { group int32 version int32