Merge pull request #81244 from liggitt/crd_startup_503

Return 503 for custom resource requests during server start
This commit is contained in:
Kubernetes Prow Robot 2019-08-13 19:36:49 -07:00 committed by GitHub
commit 3f0a486cbf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 381 additions and 9 deletions

View File

@ -99,7 +99,14 @@ go_test(
deps = [
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apis/apiextensions:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/apiserver/conversion:go_default_library",
"//staging/src/k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library",
"//staging/src/k8s.io/apimachinery/pkg/runtime/serializer/protobuf:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/discovery:go_default_library",
"//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library",
"//staging/src/k8s.io/client-go/tools/cache:go_default_library",
"//vendor/sigs.k8s.io/yaml:go_default_library",
],
)

View File

@ -95,6 +95,7 @@ type crdHandler struct {
customStorage atomic.Value
crdLister listers.CustomResourceDefinitionLister
hasSynced func() bool
delegate http.Handler
restOptionsGetter generic.RESTOptionsGetter
@ -165,6 +166,7 @@ func NewCustomResourceDefinitionHandler(
groupDiscoveryHandler: groupDiscoveryHandler,
customStorage: atomic.Value{},
crdLister: crdInformer.Lister(),
hasSynced: crdInformer.Informer().HasSynced,
delegate: delegate,
restOptionsGetter: restOptionsGetter,
admission: admission,
@ -205,7 +207,10 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
requestInfo, ok := apirequest.RequestInfoFrom(ctx)
if !ok {
responsewriters.InternalError(w, req, fmt.Errorf("no RequestInfo found in the context"))
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("no RequestInfo found in the context")),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return
}
if !requestInfo.IsResourceRequest {
@ -213,11 +218,19 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// only match /apis/<group>/<version>
// only registered under /apis
if len(pathParts) == 3 {
if !r.hasSynced() {
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return
}
r.versionDiscoveryHandler.ServeHTTP(w, req)
return
}
// only match /apis/<group>
if len(pathParts) == 2 {
if !r.hasSynced() {
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return
}
r.groupDiscoveryHandler.ServeHTTP(w, req)
return
}
@ -229,11 +242,20 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
crdName := requestInfo.Resource + "." + requestInfo.APIGroup
crd, err := r.crdLister.Get(crdName)
if apierrors.IsNotFound(err) {
if !r.hasSynced() {
responsewriters.ErrorNegotiated(serverStartingError(), Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return
}
r.delegate.ServeHTTP(w, req)
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
utilruntime.HandleError(err)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("error resolving resource")),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return
}
@ -272,7 +294,11 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
return
}
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
utilruntime.HandleError(err)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("error resolving resource")),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return
}
if !hasServedCRDVersion(crdInfo.spec, requestInfo.APIVersion) {
@ -296,7 +322,10 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
subresources, err := apiextensions.GetSubresourcesForVersion(crd, requestInfo.APIVersion)
if err != nil {
utilruntime.HandleError(err)
http.Error(w, "the server could not properly serve the CR subresources", http.StatusInternalServerError)
responsewriters.ErrorNegotiated(
apierrors.NewInternalError(fmt.Errorf("could not properly serve the subresource")),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return
}
switch {
@ -307,7 +336,10 @@ func (r *crdHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
case len(subresource) == 0:
handlerFunc = r.serveResource(w, req, requestInfo, crdInfo, terminating, supportedTypes)
default:
http.Error(w, "the server could not find the requested resource", http.StatusNotFound)
responsewriters.ErrorNegotiated(
apierrors.NewNotFound(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Name),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
}
if handlerFunc != nil {
@ -333,7 +365,9 @@ func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, req
return handlers.ListResource(storage, storage, requestScope, forceWatch, r.minRequestTimeout)
case "create":
if terminating {
http.Error(w, fmt.Sprintf("%v not allowed while CustomResourceDefinition is terminating", requestInfo.Verb), http.StatusMethodNotAllowed)
err := apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb)
err.ErrStatus.Message = fmt.Sprintf("%v not allowed while custom resource definition is terminating", requestInfo.Verb)
responsewriters.ErrorNegotiated(err, Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req)
return nil
}
return handlers.CreateResource(storage, requestScope, r.admission)
@ -348,7 +382,10 @@ func (r *crdHandler) serveResource(w http.ResponseWriter, req *http.Request, req
checkBody := true
return handlers.DeleteCollection(storage, checkBody, requestScope, r.admission)
default:
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return nil
}
}
@ -365,7 +402,10 @@ func (r *crdHandler) serveStatus(w http.ResponseWriter, req *http.Request, reque
case "patch":
return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
default:
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return nil
}
}
@ -382,7 +422,10 @@ func (r *crdHandler) serveScale(w http.ResponseWriter, req *http.Request, reques
case "patch":
return handlers.PatchResource(storage, requestScope, r.admission, supportedTypes)
default:
http.Error(w, fmt.Sprintf("unhandled verb %q", requestInfo.Verb), http.StatusMethodNotAllowed)
responsewriters.ErrorNegotiated(
apierrors.NewMethodNotSupported(schema.GroupResource{Group: requestInfo.APIGroup, Resource: requestInfo.Resource}, requestInfo.Verb),
Codecs, schema.GroupVersion{Group: requestInfo.APIGroup, Version: requestInfo.APIVersion}, w, req,
)
return nil
}
}
@ -1125,3 +1168,15 @@ func hasServedCRDVersion(spec *apiextensions.CustomResourceDefinitionSpec, versi
}
return false
}
// serverStartingError returns a ServiceUnavailble error with a retry-after time
func serverStartingError() error {
err := apierrors.NewServiceUnavailable("server is starting")
if err.ErrStatus.Details == nil {
err.ErrStatus.Details = &metav1.StatusDetails{}
}
if err.ErrStatus.Details.RetryAfterSeconds == 0 {
err.ErrStatus.Details.RetryAfterSeconds = int32(10)
}
return err
}

View File

@ -17,11 +17,23 @@ limitations under the License.
package apiserver
import (
"encoding/json"
"io"
"io/ioutil"
"net/http"
"net/http/httptest"
"sigs.k8s.io/yaml"
"testing"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions"
"k8s.io/apiextensions-apiserver/pkg/apiserver/conversion"
listers "k8s.io/apiextensions-apiserver/pkg/client/listers/apiextensions/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/runtime/serializer/protobuf"
"k8s.io/apiserver/pkg/endpoints/discovery"
apirequest "k8s.io/apiserver/pkg/endpoints/request"
"k8s.io/client-go/tools/cache"
)
func TestConvertFieldLabel(t *testing.T) {
@ -105,3 +117,301 @@ func TestConvertFieldLabel(t *testing.T) {
})
}
}
func TestRouting(t *testing.T) {
hasSynced := false
crdIndexer := cache.NewIndexer(cache.MetaNamespaceKeyFunc, cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc})
crdLister := listers.NewCustomResourceDefinitionLister(crdIndexer)
delegateCalled := false
delegate := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
delegateCalled = true
http.Error(w, "", 418)
})
customV1 := schema.GroupVersion{Group: "custom", Version: "v1"}
handler := &crdHandler{
crdLister: crdLister,
hasSynced: func() bool { return hasSynced },
delegate: delegate,
versionDiscoveryHandler: &versionDiscoveryHandler{
discovery: map[schema.GroupVersion]*discovery.APIVersionHandler{
customV1: discovery.NewAPIVersionHandler(Codecs, customV1, discovery.APIResourceListerFunc(func() []metav1.APIResource {
return nil
})),
},
delegate: delegate,
},
groupDiscoveryHandler: &groupDiscoveryHandler{
discovery: map[string]*discovery.APIGroupHandler{
"custom": discovery.NewAPIGroupHandler(Codecs, metav1.APIGroup{
Name: customV1.Group,
Versions: []metav1.GroupVersionForDiscovery{{GroupVersion: customV1.String(), Version: customV1.Version}},
PreferredVersion: metav1.GroupVersionForDiscovery{GroupVersion: customV1.String(), Version: customV1.Version},
}),
},
delegate: delegate,
},
}
testcases := []struct {
Name string
Method string
Path string
Headers map[string]string
Body io.Reader
APIGroup string
APIVersion string
Verb string
Resource string
IsResourceRequest bool
HasSynced bool
ExpectStatus int
ExpectResponse func(*testing.T, *http.Response, []byte)
ExpectDelegateCalled bool
}{
{
Name: "existing group discovery, presync",
Method: "GET",
Path: "/apis/custom",
APIGroup: "custom",
APIVersion: "",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 503,
},
{
Name: "existing group discovery",
Method: "GET",
Path: "/apis/custom",
APIGroup: "custom",
APIVersion: "",
HasSynced: true,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 200,
},
{
Name: "nonexisting group discovery, presync",
Method: "GET",
Path: "/apis/other",
APIGroup: "other",
APIVersion: "",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 503,
},
{
Name: "nonexisting group discovery",
Method: "GET",
Path: "/apis/other",
APIGroup: "other",
APIVersion: "",
HasSynced: true,
IsResourceRequest: false,
ExpectDelegateCalled: true,
ExpectStatus: 418,
},
{
Name: "existing group version discovery, presync",
Method: "GET",
Path: "/apis/custom/v1",
APIGroup: "custom",
APIVersion: "v1",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 503,
},
{
Name: "existing group version discovery",
Method: "GET",
Path: "/apis/custom/v1",
APIGroup: "custom",
APIVersion: "v1",
HasSynced: true,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 200,
},
{
Name: "nonexisting group version discovery, presync",
Method: "GET",
Path: "/apis/other/v1",
APIGroup: "other",
APIVersion: "v1",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 503,
},
{
Name: "nonexisting group version discovery",
Method: "GET",
Path: "/apis/other/v1",
APIGroup: "other",
APIVersion: "v1",
HasSynced: true,
IsResourceRequest: false,
ExpectDelegateCalled: true,
ExpectStatus: 418,
},
{
Name: "existing group, nonexisting version discovery, presync",
Method: "GET",
Path: "/apis/custom/v2",
APIGroup: "custom",
APIVersion: "v2",
HasSynced: false,
IsResourceRequest: false,
ExpectDelegateCalled: false,
ExpectStatus: 503,
},
{
Name: "existing group, nonexisting version discovery",
Method: "GET",
Path: "/apis/custom/v2",
APIGroup: "custom",
APIVersion: "v2",
HasSynced: true,
IsResourceRequest: false,
ExpectDelegateCalled: true,
ExpectStatus: 418,
},
{
Name: "nonexisting group, resource request, presync",
Method: "GET",
Path: "/apis/custom/v2/foos",
APIGroup: "custom",
APIVersion: "v2",
Verb: "list",
Resource: "foos",
HasSynced: false,
IsResourceRequest: true,
ExpectDelegateCalled: false,
ExpectStatus: 503,
},
{
Name: "nonexisting group, resource request",
Method: "GET",
Path: "/apis/custom/v2/foos",
APIGroup: "custom",
APIVersion: "v2",
Verb: "list",
Resource: "foos",
HasSynced: true,
IsResourceRequest: true,
ExpectDelegateCalled: true,
ExpectStatus: 418,
},
}
for _, tc := range testcases {
t.Run(tc.Name, func(t *testing.T) {
for _, contentType := range []string{"json", "yaml", "proto", "unknown"} {
t.Run(contentType, func(t *testing.T) {
delegateCalled = false
hasSynced = tc.HasSynced
recorder := httptest.NewRecorder()
req := httptest.NewRequest(tc.Method, tc.Path, tc.Body)
for k, v := range tc.Headers {
req.Header.Set(k, v)
}
expectStatus := tc.ExpectStatus
switch contentType {
case "json":
req.Header.Set("Accept", "application/json")
case "yaml":
req.Header.Set("Accept", "application/yaml")
case "proto":
req.Header.Set("Accept", "application/vnd.kubernetes.protobuf, application/json")
case "unknown":
req.Header.Set("Accept", "application/vnd.kubernetes.unknown")
// rather than success, we'll get a not supported error
if expectStatus == 200 {
expectStatus = 406
}
default:
t.Fatalf("unknown content type %v", contentType)
}
req = req.WithContext(apirequest.WithRequestInfo(req.Context(), &apirequest.RequestInfo{
Verb: tc.Verb,
Resource: tc.Resource,
APIGroup: tc.APIGroup,
APIVersion: tc.APIVersion,
IsResourceRequest: tc.IsResourceRequest,
Path: tc.Path,
}))
handler.ServeHTTP(recorder, req)
if tc.ExpectDelegateCalled != delegateCalled {
t.Errorf("expected delegated called %v, got %v", tc.ExpectDelegateCalled, delegateCalled)
}
result := recorder.Result()
content, _ := ioutil.ReadAll(result.Body)
if e, a := expectStatus, result.StatusCode; e != a {
t.Log(string(content))
t.Errorf("expected %v, got %v", e, a)
}
if tc.ExpectResponse != nil {
tc.ExpectResponse(t, result, content)
}
// Make sure error responses come back with status objects in all encodings, including unknown encodings
if !delegateCalled && expectStatus >= 300 {
status := &metav1.Status{}
switch contentType {
// unknown accept headers fall back to json errors
case "json", "unknown":
if e, a := "application/json", result.Header.Get("Content-Type"); e != a {
t.Errorf("expected Content-Type %v, got %v", e, a)
}
if err := json.Unmarshal(content, status); err != nil {
t.Fatal(err)
}
case "yaml":
if e, a := "application/yaml", result.Header.Get("Content-Type"); e != a {
t.Errorf("expected Content-Type %v, got %v", e, a)
}
if err := yaml.Unmarshal(content, status); err != nil {
t.Fatal(err)
}
case "proto":
if e, a := "application/vnd.kubernetes.protobuf", result.Header.Get("Content-Type"); e != a {
t.Errorf("expected Content-Type %v, got %v", e, a)
}
if _, _, err := protobuf.NewSerializer(Scheme, Scheme).Decode(content, nil, status); err != nil {
t.Fatal(err)
}
default:
t.Fatalf("unknown content type %v", contentType)
}
if e, a := metav1.Unversioned.WithKind("Status"), status.GroupVersionKind(); e != a {
t.Errorf("expected %#v, got %#v", e, a)
}
if int(status.Code) != int(expectStatus) {
t.Errorf("expected %v, got %v", expectStatus, status.Code)
}
}
})
}
})
}
}