mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-21 02:41:25 +00:00
Merge pull request #105653 from p0lyn0mial/crd-503-refactor
apiextentionserver: refactor returning 503 for custom resource requests during server start
This commit is contained in:
commit
07d3a92ce6
@ -133,6 +133,13 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// hasCRDInformerSyncedSignal is closed when the CRD informer this server uses has been fully synchronized.
|
||||
// It ensures that requests to potential custom resource endpoints while the server hasn't installed all known HTTP paths get a 503 error instead of a 404
|
||||
hasCRDInformerSyncedSignal := make(chan struct{})
|
||||
if err := genericServer.RegisterMuxAndDiscoveryCompleteSignal("CRDInformerHasNotSynced", hasCRDInformerSyncedSignal); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
s := &CustomResourceDefinitions{
|
||||
GenericAPIServer: genericServer,
|
||||
}
|
||||
@ -245,7 +252,11 @@ func (c completedConfig) New(delegationTarget genericapiserver.DelegationTarget)
|
||||
// but we won't go healthy until we can handle the ones already present.
|
||||
s.GenericAPIServer.AddPostStartHookOrDie("crd-informer-synced", func(context genericapiserver.PostStartHookContext) error {
|
||||
return wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) {
|
||||
return s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced(), nil
|
||||
if s.Informers.Apiextensions().V1().CustomResourceDefinitions().Informer().HasSynced() {
|
||||
close(hasCRDInformerSyncedSignal)
|
||||
return true, nil
|
||||
}
|
||||
return false, nil
|
||||
}, context.StopCh)
|
||||
})
|
||||
|
||||
|
@ -104,7 +104,6 @@ type crdHandler struct {
|
||||
customStorage atomic.Value
|
||||
|
||||
crdLister listers.CustomResourceDefinitionLister
|
||||
hasSynced func() bool
|
||||
|
||||
delegate http.Handler
|
||||
restOptionsGetter generic.RESTOptionsGetter
|
||||
@ -192,7 +191,6 @@ func NewCustomResourceDefinitionHandler(
|
||||
groupDiscoveryHandler: groupDiscoveryHandler,
|
||||
customStorage: atomic.Value{},
|
||||
crdLister: crdInformer.Lister(),
|
||||
hasSynced: crdInformer.Informer().HasSynced,
|
||||
delegate: delegate,
|
||||
restOptionsGetter: restOptionsGetter,
|
||||
admission: admission,
|
||||
@ -246,19 +244,11 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
// only match /apis/<group>/<version>
|
||||
// only registered under /apis
|
||||
if len(pathParts) == 3 {
|
||||
if !r.hasSynced() {
|
||||
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
|
||||
return
|
||||
}
|
||||
r.versionDiscoveryHandler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
// only match /apis/<group>
|
||||
if len(pathParts) == 2 {
|
||||
if !r.hasSynced() {
|
||||
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
|
||||
return
|
||||
}
|
||||
r.groupDiscoveryHandler.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
@ -270,11 +260,6 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
crdName := requestInfo.Resource + "." + requestInfo.APIGroup
|
||||
crd, err := r.crdLister.Get(crdName)
|
||||
if apierrors.IsNotFound(err) {
|
||||
if !r.hasSynced() {
|
||||
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
|
||||
return
|
||||
}
|
||||
|
||||
r.delegate.ServeHTTP(w, req)
|
||||
return
|
||||
}
|
||||
@ -1362,18 +1347,6 @@ func hasServedCRDVersion(spec *apiextensionsv1.CustomResourceDefinitionSpec, ver
|
||||
return false
|
||||
}
|
||||
|
||||
// serverStartingError returns a ServiceUnavailble error with a retry-after time
|
||||
func serverStartingError() error {
|
||||
err := apierrors.NewServiceUnavailable("server is starting")
|
||||
if err.ErrStatus.Details == nil {
|
||||
err.ErrStatus.Details = &metav1.StatusDetails{}
|
||||
}
|
||||
if err.ErrStatus.Details.RetryAfterSeconds == 0 {
|
||||
err.ErrStatus.Details.RetryAfterSeconds = int32(10)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
// buildOpenAPIModelsForApply constructs openapi models from any validation schemas specified in the custom resource,
|
||||
// and merges it with the models defined in the static OpenAPI spec.
|
||||
// Returns nil models if the ServerSideApply feature is disabled, or the static spec is nil, or an error is encountered.
|
||||
|
@ -151,15 +151,25 @@ func TestRouting(t *testing.T) {
|
||||
crdIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
|
||||
crdLister := listers.NewCustomResourceDefinitionLister(crdIndexer)
|
||||
|
||||
// note that in production we delegate to the special handler that is attached at the end of the delegation chain that checks if the server has installed all known HTTP paths before replying to the client.
|
||||
// it returns 503 if not all registered signals have been ready (closed) otherwise it simply replies with 404.
|
||||
// the apiextentionserver is considered to be initialized once hasCRDInformerSyncedSignal is closed.
|
||||
//
|
||||
// here, in this test the delegate represent the special handler and hasSync represents the signal.
|
||||
// primarily we just want to make sure that the delegate has been called.
|
||||
// the behaviour of the real delegate is tested elsewhere.
|
||||
delegateCalled := false
|
||||
delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
|
||||
delegateCalled = true
|
||||
if !hasSynced {
|
||||
http.Error(w, "", 503)
|
||||
return
|
||||
}
|
||||
http.Error(w, "", 418)
|
||||
})
|
||||
customV1 := schema.GroupVersion{Group: "custom", Version: "v1"}
|
||||
handler := &crdHandler{
|
||||
crdLister: crdLister,
|
||||
hasSynced: func() bool { return hasSynced },
|
||||
delegate: delegate,
|
||||
versionDiscoveryHandler: &versionDiscoveryHandler{
|
||||
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{
|
||||
@ -209,7 +219,7 @@ func TestRouting(t *testing.T) {
|
||||
HasSynced: false,
|
||||
IsResourceRequest: false,
|
||||
ExpectDelegateCalled: false,
|
||||
ExpectStatus: 503,
|
||||
ExpectStatus: 200,
|
||||
},
|
||||
{
|
||||
Name: "existing group discovery",
|
||||
@ -231,7 +241,7 @@ func TestRouting(t *testing.T) {
|
||||
APIVersion: "",
|
||||
HasSynced: false,
|
||||
IsResourceRequest: false,
|
||||
ExpectDelegateCalled: false,
|
||||
ExpectDelegateCalled: true,
|
||||
ExpectStatus: 503,
|
||||
},
|
||||
{
|
||||
@ -255,7 +265,7 @@ func TestRouting(t *testing.T) {
|
||||
HasSynced: false,
|
||||
IsResourceRequest: false,
|
||||
ExpectDelegateCalled: false,
|
||||
ExpectStatus: 503,
|
||||
ExpectStatus: 200,
|
||||
},
|
||||
{
|
||||
Name: "existing group version discovery",
|
||||
@ -277,7 +287,7 @@ func TestRouting(t *testing.T) {
|
||||
APIVersion: "v1",
|
||||
HasSynced: false,
|
||||
IsResourceRequest: false,
|
||||
ExpectDelegateCalled: false,
|
||||
ExpectDelegateCalled: true,
|
||||
ExpectStatus: 503,
|
||||
},
|
||||
{
|
||||
@ -300,7 +310,7 @@ func TestRouting(t *testing.T) {
|
||||
APIVersion: "v2",
|
||||
HasSynced: false,
|
||||
IsResourceRequest: false,
|
||||
ExpectDelegateCalled: false,
|
||||
ExpectDelegateCalled: true,
|
||||
ExpectStatus: 503,
|
||||
},
|
||||
{
|
||||
@ -325,7 +335,7 @@ func TestRouting(t *testing.T) {
|
||||
Resource: "foos",
|
||||
HasSynced: false,
|
||||
IsResourceRequest: true,
|
||||
ExpectDelegateCalled: false,
|
||||
ExpectDelegateCalled: true,
|
||||
ExpectStatus: 503,
|
||||
},
|
||||
{
|
||||
|
Loading…
Reference in New Issue
Block a user