Merge pull request #89145 from sttts/sttts-apiextensions-discovery-sync

apiextensions: wait for complete discovery endpoint
This commit is contained in:
Kubernetes Prow Robot 2020-03-18 22:24:57 -07:00 committed by GitHub
commit c1a66a4b02
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 35 additions and 3 deletions

View File

@ -207,7 +207,7 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.Handle("/apis", crdHandler)
s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler) s.GenericAPIServer.Handler.NonGoRestfulMux.HandlePrefix("/apis/", crdHandler)
crdController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler) discoveryController := NewDiscoveryController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), versionDiscoveryHandler, groupDiscoveryHandler)
namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) namingController := status.NewNamingConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) nonStructuralSchemaController := nonstructuralschema.NewConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1()) apiApprovalController := apiapproval.NewKubernetesAPIApprovalPolicyConformantConditionController(s.Informers.Apiextensions().V1().CustomResourceDefinitions(), crdClient.ApiextensionsV1())
@ -231,12 +231,19 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh) go openapiController.Run(s.GenericAPIServer.StaticOpenAPISpec, s.GenericAPIServer.OpenAPIVersionedService, context.StopCh)
} }
go crdController.Run(context.StopCh)
go namingController.Run(context.StopCh) go namingController.Run(context.StopCh)
go establishingController.Run(context.StopCh) go establishingController.Run(context.StopCh)
go nonStructuralSchemaController.Run(5, context.StopCh) go nonStructuralSchemaController.Run(5, context.StopCh)
go apiApprovalController.Run(5, context.StopCh) go apiApprovalController.Run(5, context.StopCh)
go finalizingController.Run(5, context.StopCh) go finalizingController.Run(5, context.StopCh)
discoverySyncedCh := make(chan struct{})
go discoveryController.Run(context.StopCh, discoverySyncedCh)
select {
case <-context.StopCh:
case <-discoverySyncedCh:
}
return nil return nil
}) })
// we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer // we don't want to report healthy until we can handle all CRDs that have already been registered. Waiting for the informer

View File

@ -201,7 +201,7 @@ func sortGroupDiscoveryByKubeAwareVersion(gd []metav1.GroupVersionForDiscovery)
}) })
} }
func (c *DiscoveryController) Run(stopCh <-chan struct{}) { func (c *DiscoveryController) Run(stopCh <-chan struct{}, synchedCh chan<- struct{}) {
defer utilruntime.HandleCrash() defer utilruntime.HandleCrash()
defer c.queue.ShutDown() defer c.queue.ShutDown()
defer klog.Infof("Shutting down DiscoveryController") defer klog.Infof("Shutting down DiscoveryController")
@ -213,6 +213,31 @@ func (c *DiscoveryController) Run(stopCh <-chan struct{}) {
return return
} }
// initially sync all group versions to make sure we serve complete discovery
if err := wait.PollImmediateUntil(time.Second, func() (bool, error) {
crds, err := c.crdLister.List(labels.Everything())
if err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially list CRDs: %v", err))
return false, nil
}
for _, crd := range crds {
for _, v := range crd.Spec.Versions {
gv := schema.GroupVersion{crd.Spec.Group, v.Name}
if err := c.sync(gv); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to initially sync CRD version %v: %v", gv, err))
return false, nil
}
}
}
return true, nil
}, stopCh); err == wait.ErrWaitTimeout {
utilruntime.HandleError(fmt.Errorf("timed out waiting for discovery endpoint to initialize"))
return
} else if err != nil {
panic(fmt.Errorf("unexpected error: %v", err))
}
close(synchedCh)
// only start one worker thread since its a slow moving API // only start one worker thread since its a slow moving API
go wait.Until(c.runWorker, time.Second, stopCh) go wait.Until(c.runWorker, time.Second, stopCh)