From 34f57377ae8692fc8a4a6e887e12d78ea5739464 Mon Sep 17 00:00:00 2001 From: "Dr. Stefan Schimanski" Date: Mon, 16 Mar 2020 10:31:33 +0100 Subject: [PATCH] apiextensions: wait for complete discovery endpoint --- .../pkg/apiserver/apiserver.go | 11 ++++++-- .../customresource_discovery_controller.go | 27 ++++++++++++++++++- 2 files changed, 35 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go index 390b5a4471b..2eb164c6a7b 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/apiserver.go @@ -207,7 +207,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler) - crdController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler) + discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler) namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) @@ -231,12 +231,19 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget) go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh) } - go crdController.Run(context.StopCh) go namingController.Run(context.StopCh) go establishingController.Run(context.StopCh) go nonStructuralSchemaController.Run(5, context.StopCh) go apiApprovalController.Run(5, context.StopCh) go finalizingController.Run(5, context.StopCh) + + discoverySyncedCh := make(chan struct{}) + go discoveryController.Run(context.StopCh, discoverySyncedCh) + select { + case <-context.StopCh: + case <-discoverySyncedCh: + } + return nil }) // we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer diff --git a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go index f897abd9ac5..3dedf558297 100644 --- a/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go +++ b/staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/customresource_discovery_controller.go @@ -201,7 +201,7 @@ func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery) }) } -func (c *DiscoveryController) Run(stopCh <-chan struct{}) { +func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struct{}) { defer utilruntime.HandleCrash() defer c.queue.ShutDown() defer klog.Infof("Shutting down DiscoveryController") @@ -213,6 +213,31 @@ func (c *DiscoveryController) Run(stopCh <-chan struct{}) { return } + // initially sync all group versions to make sure we serve complete discovery + if err := wait.PollImmediateUntil(time.Second, func() (bool, error) { + crds, err := c.crdLister.List(labels.Everything()) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err)) + return false, nil + } + for _, crd := range crds { + for _, v := range crd.Spec.Versions { + gv := schema.GroupVersion{crd.Spec.Group, v.Name} + if err := c.sync(gv); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to initially sync CRD version %v: %v", gv, err)) + return false, nil + } + } + } + return true, nil + }, stopCh); err == wait.ErrWaitTimeout { + utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize")) + return + } else if err != nil { + panic(fmt.Errorf("unexpected error: %v", err)) + } + close(synchedCh) + // only start one worker thread since its a slow moving API go wait.Until(c.runWorker, time.Second, stopCh)