From 56369375e0ce4c87ad7cb443a58c566c406f6cc0 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 17 Feb 2020 14:09:50 -0800 Subject: [PATCH 01/11] Add a feature gate --- staging/src/k8s.io/apiserver/pkg/features/kube_features.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go index a45bfa83c37..7dc745fa7ec 100644 --- a/staging/src/k8s.io/apiserver/pkg/features/kube_features.go +++ b/staging/src/k8s.io/apiserver/pkg/features/kube_features.go @@ -107,6 +107,12 @@ const ( // document. StorageVersionHash featuregate.Feature = "StorageVersionHash" + // owner: @caesarxuchao @roycaihw + // alpha: v1.20 + // + // Enable the storage version API. + StorageVersionAPI featuregate.Feature = "StorageVersionAPI" + // owner: @wojtek-t // alpha: v1.15 // beta: v1.16 @@ -173,6 +179,7 @@ var defaultKubernetesFeatureGates = map[featuregate.Feature]featuregate.FeatureS RemainingItemCount: {Default: true, PreRelease: featuregate.Beta}, ServerSideApply: {Default: true, PreRelease: featuregate.Beta}, StorageVersionHash: {Default: true, PreRelease: featuregate.Beta}, + StorageVersionAPI: {Default: false, PreRelease: featuregate.Alpha}, WatchBookmark: {Default: true, PreRelease: featuregate.GA, LockToDefault: true}, APIPriorityAndFairness: {Default: false, PreRelease: featuregate.Alpha}, RemoveSelfLink: {Default: true, PreRelease: featuregate.Beta}, From 369475681679e1cb68c23c3355542599a3c0e1e3 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 24 Feb 2020 15:35:14 -0800 Subject: [PATCH 02/11] Collect storage versions as ResourceInfo when installing API endpoints. Co-authored-by: Haowei Cai --- .../apiserver/pkg/endpoints/apiserver_test.go | 12 +- .../apiserver/pkg/endpoints/groupversion.go | 20 ++- .../apiserver/pkg/endpoints/installer.go | 118 +++++++++++------- .../apiserver/pkg/server/genericapiserver.go | 16 ++- .../apiserver/pkg/storageversion/manager.go | 72 +++++++++-- .../pkg/storageversion/manager_test.go | 62 +++++++++ 6 files changed, 238 insertions(+), 62 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 15786c002d0..f0da9ede17c 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -254,7 +254,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = grouplessGroupVersion group.OptionsExternalVersion = &grouplessGroupVersion group.Serializer = codecs - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -266,7 +266,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = testGroupVersion group.OptionsExternalVersion = &testGroupVersion group.Serializer = codecs - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -278,7 +278,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = newGroupVersion group.OptionsExternalVersion = &newGroupVersion group.Serializer = codecs - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -3678,7 +3678,7 @@ func TestParentResourceIsRequired(t *testing.T) { ParameterCodec: parameterCodec, } container := restful.NewContainer() - if err := group.InstallREST(container); err == nil { + if _, err := group.InstallREST(container); err == nil { t.Fatal("expected error") } @@ -3710,7 +3710,7 @@ func TestParentResourceIsRequired(t *testing.T) { ParameterCodec: parameterCodec, } container = restful.NewContainer() - if err := group.InstallREST(container); err != nil { + if _, err := group.InstallREST(container); err != nil { t.Fatal(err) } @@ -4566,7 +4566,7 @@ func TestXGSubresource(t *testing.T) { Serializer: codecs, } - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go index e9c57f18e3a..22b97366142 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go @@ -32,6 +32,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storageversion" openapiproto "k8s.io/kube-openapi/pkg/util/proto" ) @@ -96,7 +97,7 @@ type APIGroupVersion struct { // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. -func (g *APIGroupVersion) InstallREST(container *restful.Container) error { +func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, @@ -104,11 +105,24 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) error { minRequestTimeout: g.MinRequestTimeout, } - apiResources, ws, registrationErrors := installer.Install() + apiResources, resourceInfos, ws, registrationErrors := installer.Install() versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) container.Add(ws) - return utilerrors.NewAggregate(registrationErrors) + return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors) +} + +func removeNonPersistedResources(infos []*storageversion.ResourceInfo) []*storageversion.ResourceInfo { + var filtered []*storageversion.ResourceInfo + for _, info := range infos { + // if EncodingVersion is empty, then the apiserver does not + // need to register this resource via the storage version API, + // thus we can remove it. + if info != nil && len(info.EncodingVersion) > 0 { + filtered = append(filtered, info) + } + } + return filtered } // staticLister implements the APIResourceLister interface diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index cabae0c464c..b49a090aa2f 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -43,6 +43,7 @@ import ( utilwarning "k8s.io/apiserver/pkg/endpoints/warning" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" versioninfo "k8s.io/component-base/version" ) @@ -94,8 +95,9 @@ var toDiscoveryKubeVerb = map[string]string{ } // Install handlers for API resources. -func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) { +func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) { var apiResources []metav1.APIResource + var resourceInfos []*storageversion.ResourceInfo var errors []error ws := a.newWebService() @@ -108,15 +110,18 @@ func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []e } sort.Strings(paths) for _, path := range paths { - apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws) + apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws) if err != nil { errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err)) } if apiResource != nil { apiResources = append(apiResources, *apiResource) } + if resourceInfo != nil { + resourceInfos = append(resourceInfos, resourceInfo) + } } - return apiResources, ws, errors + return apiResources, resourceInfos, ws, errors } // newWebService creates a new restful webservice with the api installer's prefix and version. @@ -182,7 +187,7 @@ func GetResourceKind(groupVersion schema.GroupVersion, storage rest.Storage, typ return fqKindToRegister, nil } -func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) { +func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) { admit := a.group.Admit optionsExternalVersion := a.group.GroupVersion @@ -192,19 +197,19 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag resource, subresource, err := splitSubresource(path) if err != nil { - return nil, err + return nil, nil, err } group, version := a.group.GroupVersion.Group, a.group.GroupVersion.Version fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer) if err != nil { - return nil, err + return nil, nil, err } versionedPtr, err := a.group.Creater.New(fqKindToRegister) if err != nil { - return nil, err + return nil, nil, err } defaultVersionedObject := indirectArbitraryPointer(versionedPtr) kind := fqKindToRegister.Kind @@ -215,18 +220,18 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isSubresource { parentStorage, ok := a.group.Storage[resource] if !ok { - return nil, fmt.Errorf("missing parent storage: %q", resource) + return nil, nil, fmt.Errorf("missing parent storage: %q", resource) } scoper, ok := parentStorage.(rest.Scoper) if !ok { - return nil, fmt.Errorf("%q must implement scoper", resource) + return nil, nil, fmt.Errorf("%q must implement scoper", resource) } namespaceScoped = scoper.NamespaceScoped() } else { scoper, ok := storage.(rest.Scoper) if !ok { - return nil, fmt.Errorf("%q must implement scoper", resource) + return nil, nil, fmt.Errorf("%q must implement scoper", resource) } namespaceScoped = scoper.NamespaceScoped() } @@ -255,7 +260,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag versionedExportOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("ExportOptions")) if err != nil { - return nil, err + return nil, nil, err } if isNamedCreater { @@ -267,30 +272,30 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag list := lister.NewList() listGVKs, _, err := a.group.Typer.ObjectKinds(list) if err != nil { - return nil, err + return nil, nil, err } versionedListPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind(listGVKs[0].Kind)) if err != nil { - return nil, err + return nil, nil, err } versionedList = indirectArbitraryPointer(versionedListPtr) } versionedListOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("ListOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedCreateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("CreateOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedPatchOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("PatchOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedUpdateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("UpdateOptions")) if err != nil { - return nil, err + return nil, nil, err } var versionedDeleteOptions runtime.Object @@ -299,7 +304,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isGracefulDeleter { versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions) @@ -310,7 +315,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag versionedStatusPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("Status")) if err != nil { - return nil, err + return nil, nil, err } versionedStatus := indirectArbitraryPointer(versionedStatusPtr) var ( @@ -323,14 +328,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag getOptions, getSubpath, _ = getterWithOptions.NewGetOptions() getOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(getOptions) if err != nil { - return nil, err + return nil, nil, err } getOptionsInternalKind = getOptionsInternalKinds[0] versionedGetOptions, err = a.group.Creater.New(a.group.GroupVersion.WithKind(getOptionsInternalKind.Kind)) if err != nil { versionedGetOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(getOptionsInternalKind.Kind)) if err != nil { - return nil, err + return nil, nil, err } } isGetter = true @@ -340,7 +345,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isWatcher { versionedWatchEventPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind("WatchEvent")) if err != nil { - return nil, err + return nil, nil, err } versionedWatchEvent = indirectArbitraryPointer(versionedWatchEventPtr) } @@ -356,7 +361,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if connectOptions != nil { connectOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(connectOptions) if err != nil { - return nil, err + return nil, nil, err } connectOptionsInternalKind = connectOptionsInternalKinds[0] @@ -364,7 +369,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if err != nil { versionedConnectOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(connectOptionsInternalKind.Kind)) if err != nil { - return nil, err + return nil, nil, err } } } @@ -388,7 +393,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag tableProvider, isTableProvider := storage.(rest.TableConvertor) if isLister && !isTableProvider { // All listers must implement TableProvider - return nil, fmt.Errorf("%q must implement TableConvertor", resource) + return nil, nil, fmt.Errorf("%q must implement TableConvertor", resource) } var apiResource metav1.APIResource @@ -398,7 +403,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag versioner := storageVersionProvider.StorageVersion() gvk, err := getStorageVersionKind(versioner, storage, a.group.Typer) if err != nil { - return nil, err + return nil, nil, err } apiResource.StorageVersionHash = discovery.StorageVersionHash(gvk.Group, gvk.Version, gvk.Kind) } @@ -506,6 +511,29 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } } + var resourceInfo *storageversion.ResourceInfo + if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && + isStorageVersionProvider && + storageVersionProvider.StorageVersion() != nil { + + versioner := storageVersionProvider.StorageVersion() + encodingGVK, err := getStorageVersionKind(versioner, storage, a.group.Typer) + if err != nil { + return nil, nil, err + } + resourceInfo = &storageversion.ResourceInfo{ + GroupResource: schema.GroupResource{ + Group: a.group.GroupVersion.Group, + Resource: apiResource.Name, + }, + EncodingVersion: encodingGVK.GroupVersion().String(), + // We record EquivalentResourceMapper first instead of calculate + // DecodableVersions immediately because API installation must + // be completed first for us to know equivalent APIs + EquivalentResourceMapper: a.group.EquivalentResourceRegistry, + } + } + // Create Routes for the actions. // TODO: Add status documentation using Returns() // Errors (see api/errors/errors.go as well as go-restful router): @@ -525,7 +553,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag for _, s := range a.group.Serializer.SupportedMediaTypes() { if len(s.MediaTypeSubType) == 0 || len(s.MediaTypeType) == 0 { - return nil, fmt.Errorf("all serializers in the group Serializer must have MediaTypeType and MediaTypeSubType set: %s", s.MediaType) + return nil, nil, fmt.Errorf("all serializers in the group Serializer must have MediaTypeType and MediaTypeSubType set: %s", s.MediaType) } } mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer) @@ -573,7 +601,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag isSubresource, ) if err != nil { - return nil, fmt.Errorf("failed to create field manager: %v", err) + return nil, nil, fmt.Errorf("failed to create field manager: %v", err) } } for _, action := range actions { @@ -608,7 +636,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag kubeVerbs[kubeVerb] = struct{}{} } } else { - return nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb) + return nil, nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb) } routes := []*restful.RouteBuilder{} @@ -617,12 +645,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isSubresource { parentStorage, ok := a.group.Storage[resource] if !ok { - return nil, fmt.Errorf("missing parent storage: %q", resource) + return nil, nil, fmt.Errorf("missing parent storage: %q", resource) } fqParentKind, err := GetResourceKind(a.group.GroupVersion, parentStorage, a.group.Typer) if err != nil { - return nil, err + return nil, nil, err } kind = fqParentKind.Kind } @@ -681,12 +709,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Writes(producedObject) if isGetterWithOptions { if err := AddObjectParams(ws, route, versionedGetOptions); err != nil { - return nil, err + return nil, nil, err } } if isExporter { if err := AddObjectParams(ws, route, versionedExportOptions); err != nil { - return nil, err + return nil, nil, err } } addParams(route, action.Params) @@ -708,7 +736,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Returns(http.StatusOK, "OK", versionedList). Writes(versionedList) if err := AddObjectParams(ws, route, versionedListOptions); err != nil { - return nil, err + return nil, nil, err } switch { case isLister && isWatcher: @@ -747,7 +775,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Reads(defaultVersionedObject). Writes(producedObject) if err := AddObjectParams(ws, route, versionedUpdateOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -778,7 +806,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Reads(metav1.Patch{}). Writes(producedObject) if err := AddObjectParams(ws, route, versionedPatchOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -811,7 +839,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Reads(defaultVersionedObject). Writes(producedObject) if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -841,7 +869,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag route.Reads(versionedDeleterObject) route.ParameterNamed("body").Required(false) if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil { - return nil, err + return nil, nil, err } } addParams(route, action.Params) @@ -866,11 +894,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag route.Reads(versionedDeleterObject) route.ParameterNamed("body").Required(false) if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil { - return nil, err + return nil, nil, err } } if err := AddObjectParams(ws, route, versionedListOptions, "watch", "allowWatchBookmarks"); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -893,7 +921,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Returns(http.StatusOK, "OK", versionedWatchEvent). Writes(versionedWatchEvent) if err := AddObjectParams(ws, route, versionedListOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -916,7 +944,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Returns(http.StatusOK, "OK", versionedWatchEvent). Writes(versionedWatchEvent) if err := AddObjectParams(ws, route, versionedListOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -943,7 +971,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Writes(connectProducedObject) if versionedConnectOptions != nil { if err := AddObjectParams(ws, route, versionedConnectOptions); err != nil { - return nil, err + return nil, nil, err } } addParams(route, action.Params) @@ -957,7 +985,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } } default: - return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb) + return nil, nil, fmt.Errorf("unrecognized action verb: %s", action.Verb) } for _, route := range routes { route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{ @@ -993,7 +1021,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag // Record the existence of the GVR and the corresponding GVK a.group.EquivalentResourceRegistry.RegisterKindFor(reqScope.Resource, reqScope.Subresource, fqKindToRegister) - return &apiResource, nil + return &apiResource, resourceInfo, nil } // indirectArbitraryPointer returns *ptrToObject for an arbitrary pointer diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 07cef25bbea..d0f42684421 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -45,6 +45,7 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" + "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" utilopenapi "k8s.io/apiserver/pkg/util/openapi" restclient "k8s.io/client-go/rest" @@ -199,6 +200,9 @@ type GenericAPIServer struct { // APIServerID is the ID of this API server APIServerID string + + // StorageVersionManager holds the storage versions of the API resources installed by this server. + StorageVersionManager storageversion.Manager } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -409,6 +413,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan // installAPIResources is a private method for installing the REST storage backing each api groupversionresource func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error { + var resourceInfos []*storageversion.ResourceInfo for _, groupVersion := range apiGroupInfo.PrioritizedVersions { if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 { klog.Warningf("Skipping API %v because it has no resources.", groupVersion) @@ -431,9 +436,18 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes - if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil { + r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer) + if err != nil { return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err) } + resourceInfos = append(resourceInfos, r...) + } + + if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) { + // API installation happens before we start listening on the handlers, + // therefore it is safe to register ResourceInfos here. The handler will block + // write requests until the storage versions of the targeting resources are updated. + s.StorageVersionManager.AddResourceInfo(resourceInfos...) } return nil diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go index 0032a8e768f..03e21a4d6d3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go @@ -18,6 +18,7 @@ package storageversion import ( "fmt" + "sort" "sync" "sync/atomic" @@ -98,7 +99,10 @@ func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) { func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) { gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") for _, gvr := range gvrs { - s.managedStatus[gvr.GroupResource()] = &updateStatus{} + gr := gvr.GroupResource() + if _, ok := s.managedStatus[gr]; !ok { + s.managedStatus[gr] = &updateStatus{} + } } } @@ -112,22 +116,37 @@ func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.C sc := clientset.InternalV1alpha1().StorageVersions() s.mu.RLock() - resources := []*ResourceInfo{} + resources := []ResourceInfo{} for resource := range s.managedResourceInfos { - resources = append(resources, resource) + resources = append(resources, *resource) } s.mu.RUnlock() hasFailure := false - for _, r := range resources { + // Sorting the list to make sure we have a consistent dedup result, and + // therefore avoid creating unnecessarily duplicated StorageVersion objects. + // For example, extensions.ingresses and networking.k8s.io.ingresses share + // the same underlying storage. Without sorting, in an HA cluster, one + // apiserver may dedup and update StorageVersion for extensions.ingresses, + // while another apiserver may dedup and update StorageVersion for + // networking.k8s.io.ingresses. The storage migrator (which migrates objects + // per GroupResource) will migrate these resources twice, since both + // StorageVersion objects have CommonEncodingVersion (each with one server registered). + sortResourceInfosByGroupResource(resources) + for _, r := range dedupResourceInfos(resources) { dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource) - if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil { + gr := r.GroupResource + // Group must be a valid subdomain in DNS (RFC 1123) + if len(gr.Group) == 0 { + gr.Group = "core" + } + if err := updateStorageVersionFor(sc, serverID, gr, r.EncodingVersion, dv); err != nil { utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err)) - s.recordStatusFailure(r, err) + s.recordStatusFailure(&r, err) hasFailure = true continue } klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource) - s.recordStatusSuccess(r) + s.recordStatusSuccess(&r) } if hasFailure { return @@ -136,6 +155,45 @@ func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.C s.setComplete() } +// dedupResourceInfos dedups ResourceInfos with the same underlying storage. +// ResourceInfos from the same Group with different Versions share the same underlying storage. +// ResourceInfos from different Groups may share the same underlying storage, e.g. +// networking.k8s.io ingresses and extensions ingresses. The StorageVersion manager +// only needs to update one StorageVersion for the equivalent Groups. +func dedupResourceInfos(infos []ResourceInfo) []ResourceInfo { + var ret []ResourceInfo + seen := make(map[schema.GroupResource]struct{}) + for _, info := range infos { + gr := info.GroupResource + if _, ok := seen[gr]; ok { + continue + } + gvrs := info.EquivalentResourceMapper.EquivalentResourcesFor(gr.WithVersion(""), "") + for _, gvr := range gvrs { + seen[gvr.GroupResource()] = struct{}{} + } + ret = append(ret, info) + } + return ret +} + +func sortResourceInfosByGroupResource(infos []ResourceInfo) { + sort.Sort(byGroupResource(infos)) +} + +type byGroupResource []ResourceInfo + +func (s byGroupResource) Len() int { return len(s) } + +func (s byGroupResource) Less(i, j int) bool { + if s[i].GroupResource.Group == s[j].GroupResource.Group { + return s[i].GroupResource.Resource < s[j].GroupResource.Resource + } + return s[i].GroupResource.Group < s[j].GroupResource.Group +} + +func (s byGroupResource) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + // recordStatusSuccess marks updated ResourceInfo as completed. func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) { s.mu.Lock() diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go b/staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go new file mode 100644 index 00000000000..0077cbdf51e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestSortResourceInfosByGroupResource(t *testing.T) { + tests := []struct { + infos []ResourceInfo + expected []ResourceInfo + }{ + { + infos: nil, + expected: nil, + }, + { + infos: []ResourceInfo{}, + expected: []ResourceInfo{}, + }, + { + infos: []ResourceInfo{ + {GroupResource: schema.GroupResource{Group: "", Resource: "pods"}}, + {GroupResource: schema.GroupResource{Group: "", Resource: "nodes"}}, + {GroupResource: schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}}, + {GroupResource: schema.GroupResource{Group: "extensions", Resource: "ingresses"}}, + }, + expected: []ResourceInfo{ + {GroupResource: schema.GroupResource{Group: "", Resource: "nodes"}}, + {GroupResource: schema.GroupResource{Group: "", Resource: "pods"}}, + {GroupResource: schema.GroupResource{Group: "extensions", Resource: "ingresses"}}, + {GroupResource: schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}}, + }, + }, + } + + for _, tc := range tests { + sortResourceInfosByGroupResource(tc.infos) + if e, a := tc.expected, tc.infos; !reflect.DeepEqual(e, a) { + t.Errorf("unexpected: %v", cmp.Diff(e, a)) + } + } +} From 721897871697db007c2439ac298c579c0f201388 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 24 Feb 2020 15:36:08 -0800 Subject: [PATCH 03/11] Add a generic filter that blocks certain write requests before StorageVersions are updated during apiserver bootstrap. Also add a poststarthook to the aggregator which updates the StorageVersions via the storageversion.Manager --- cmd/kube-apiserver/app/aggregator.go | 8 ++ .../pkg/endpoints/filters/storageversion.go | 80 +++++++++++++++++++ .../handlers/responsewriters/errors.go | 7 ++ .../src/k8s.io/apiserver/pkg/server/config.go | 19 ++++- .../pkg/apiserver/apiserver.go | 31 +++++++ 5 files changed, 142 insertions(+), 3 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 535cf517bfe..5e72a59f78a 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apiserver/pkg/admission" "k8s.io/apiserver/pkg/features" + genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" genericoptions "k8s.io/apiserver/pkg/server/options" @@ -67,6 +68,13 @@ func createAggregatorConfig( genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} genericConfig.RESTOptionsGetter = nil + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + // Add StorageVersionPrecondition handler to aggregator-apiserver. + // The handler will block write requests to built-in resources until the + // target resources' storage versions are up-to-date. + genericConfig.BuildHandlerChainFunc = genericapiserver.BuildHandlerChainWithStorageVersionPrecondition + } + // override genericConfig.AdmissionControl with kube-aggregator's scheme, // because aggregator apiserver should use its own scheme to convert its own resources. err := commandOptions.Admission.ApplyTo( diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go new file mode 100644 index 00000000000..9b164957c84 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go @@ -0,0 +1,80 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package filters + +import ( + "errors" + "fmt" + "net/http" + + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" + "k8s.io/apiserver/pkg/endpoints/request" + "k8s.io/apiserver/pkg/storageversion" + _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration + "k8s.io/klog/v2" +) + +// WithStorageVersionPrecondition checks if the storage version barrier has +// completed, if not, it only passes the following API requests: +// 1. non-resource requests, +// 2. read requests, +// 3. write requests to the storageversion API, +// 4. resources whose StorageVersion is not pending update, including non-persisted resources. +func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager) http.Handler { + if svm == nil { + // TODO(roycaihw): switch to warning after the feature graduate to beta/GA + klog.V(2).Infof("Storage Version barrier is disabled") + return handler + } + return http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + if svm.Completed() { + handler.ServeHTTP(w, req) + return + } + ctx := req.Context() + requestInfo, found := request.RequestInfoFrom(ctx) + if !found { + responsewriters.InternalError(w, req, errors.New("no RequestInfo found in the context")) + return + } + // Allow non-resource requests + if !requestInfo.IsResourceRequest { + handler.ServeHTTP(w, req) + return + } + // Allow read requests + if requestInfo.Verb == "get" || requestInfo.Verb == "list" || requestInfo.Verb == "watch" { + handler.ServeHTTP(w, req) + return + } + // Allow writes to the storage version API + if requestInfo.APIGroup == "internal.apiserver.k8s.io" && requestInfo.Resource == "storageversions" { + handler.ServeHTTP(w, req) + return + } + // If the resource's StorageVersion is not in the to-be-updated list, let it pass. + // Non-persisted resources are not in the to-be-updated list, so they will pass. + gr := schema.GroupResource{requestInfo.APIGroup, requestInfo.Resource} + if !svm.PendingUpdate(gr) { + handler.ServeHTTP(w, req) + return + } + + responsewriters.ServiceUnavailabeError(w, req, errors.New(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr)))) + }) +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go index d13bee4d223..ea7387537f6 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go @@ -76,3 +76,10 @@ func InternalError(w http.ResponseWriter, req *http.Request, err error) { http.StatusInternalServerError) utilruntime.HandleError(err) } + +// ServiceUnavailabeError renders a simple internal error +func ServiceUnavailabeError(w http.ResponseWriter, req *http.Request, err error) { + http.Error(w, sanitizer.Replace(fmt.Sprintf("Service Unavailable: %q: %v", req.RequestURI, err)), + http.StatusServiceUnavailable) + utilruntime.HandleError(err) +} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index 80be10c01fd..c14759f419d 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -62,6 +62,7 @@ import ( "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" serverstore "k8s.io/apiserver/pkg/server/storage" + "k8s.io/apiserver/pkg/storageversion" "k8s.io/apiserver/pkg/util/feature" utilflowcontrol "k8s.io/apiserver/pkg/util/flowcontrol" "k8s.io/client-go/informers" @@ -227,6 +228,9 @@ type Config struct { // APIServerID is the ID of this API server APIServerID string + + // StorageVersionManager holds the storage versions of the API resources installed by this server. + StorageVersionManager storageversion.Manager } type RecommendedConfig struct { @@ -331,8 +335,9 @@ func NewConfig(codecs serializer.CodecFactory) *Config { // Default to treating watch as a long-running operation // Generic API servers have no inherent long-running subresources - LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), - APIServerID: id, + LongRunningFunc: genericfilters.BasicLongRunningRequestCheck(sets.NewString("watch"), sets.NewString()), + APIServerID: id, + StorageVersionManager: storageversion.NewDefaultManager(), } } @@ -582,7 +587,9 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G maxRequestBodyBytes: c.MaxRequestBodyBytes, livezClock: clock.RealClock{}, - APIServerID: c.APIServerID, + + APIServerID: c.APIServerID, + StorageVersionManager: c.StorageVersionManager, } for { @@ -703,6 +710,12 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G return s, nil } +func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c *Config) http.Handler { + // WithStorageVersionPrecondition needs the WithRequestInfo to run first + handler := genericapifilters.WithStorageVersionPrecondition(apiHandler, c.StorageVersionManager) + return DefaultBuildHandlerChain(handler, c) +} + func DefaultBuildHandlerChain(apiHandler http.Handler, c *Config) http.Handler { handler := filterlatency.TrackCompleted(apiHandler) handler = genericapifilters.WithAuthorization(handler, c.Authorization.Authorizer, c.Serializer) diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 1b22b24e274..30d187ec45d 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -24,9 +24,12 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" + "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" genericapiserver "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/egressselector" serverstorage "k8s.io/apiserver/pkg/server/storage" + utilfeature "k8s.io/apiserver/pkg/util/feature" "k8s.io/client-go/pkg/version" openapicommon "k8s.io/kube-openapi/pkg/common" @@ -264,6 +267,34 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + // Spawn a goroutine in aggregator apiserver to update storage version for + // all built-in resources + s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(context genericapiserver.PostStartHookContext) error { + // Technically an apiserver only needs to update storage version once during bootstrap. + // Reconcile StorageVersion objects every 10 minutes will help in the case that the + // StorageVersion objects get accidentally modified/deleted by a different agent. In that + // case, the reconciliation ensures future storage migration still works. If nothing gets + // changed, the reconciliation update is a noop and gets short-circuited by the apiserver, + // therefore won't change the resource version and trigger storage migration. + go wait.PollImmediateUntil(10*time.Minute, func() (bool, error) { + // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver) + // share the same generic apiserver config. The same StorageVersion manager is used + // to register all built-in resources when the generic apiservers install APIs. + s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(context.LoopbackClientConfig, s.GenericAPIServer.APIServerID) + return false, nil + }, context.StopCh) + // Once the storage version updater finishes the first round of update, + // the PostStartHook will return to unblock /healthz. The handler chain + // won't block write requests anymore. Check every second since it's not + // expensive. + wait.PollImmediateUntil(1*time.Second, func() (bool, error) { + return s.GenericAPIServer.StorageVersionManager.Completed(), nil + }, context.StopCh) + return nil + }) + } + return s, nil } From 22452917c20f6018a19e1e427c420388c8072a5d Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 17 Feb 2020 14:19:32 -0800 Subject: [PATCH 04/11] make some rbac and scheduling post start hooks tolerate the apiserver bootstrap delay caused by installing storage versions. --- pkg/registry/rbac/rest/storage_rbac.go | 21 +++++++++++++++---- .../scheduling/rest/storage_scheduling.go | 11 +++++++--- 2 files changed, 25 insertions(+), 7 deletions(-) diff --git a/pkg/registry/rbac/rest/storage_rbac.go b/pkg/registry/rbac/rest/storage_rbac.go index 8776ddbfe96..6b853d5174b 100644 --- a/pkg/registry/rbac/rest/storage_rbac.go +++ b/pkg/registry/rbac/rest/storage_rbac.go @@ -26,6 +26,7 @@ import ( rbacapiv1 "k8s.io/api/rbac/v1" rbacapiv1alpha1 "k8s.io/api/rbac/v1alpha1" rbacapiv1beta1 "k8s.io/api/rbac/v1beta1" + "k8s.io/apimachinery/pkg/api/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" @@ -160,6 +161,14 @@ type PolicyData struct { ClusterRoleBindingsToSplit map[string]rbacapiv1.ClusterRoleBinding } +func isConflictOrServiceUnavailable(err error) bool { + return errors.IsConflict(err) || errors.IsServiceUnavailable(err) +} + +func retryOnConflictOrServiceUnavailable(backoff wait.Backoff, fn func() error) error { + return retry.OnError(backoff, isConflictOrServiceUnavailable, fn) +} + func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc { return func(hookContext genericapiserver.PostStartHookContext) error { // initializing roles is really important. On some e2e runs, we've seen cases where etcd is down when the server @@ -206,7 +215,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc { Client: reconciliation.ClusterRoleModifier{Client: clientset.ClusterRoles()}, Confirm: true, } - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // ServiceUnavailble error is returned when the API server is blocked by storage version updates + err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error { result, err := opts.Run() if err != nil { return err @@ -234,7 +244,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc { Client: reconciliation.ClusterRoleBindingClientAdapter{Client: clientset.ClusterRoleBindings()}, Confirm: true, } - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // ServiceUnavailble error is returned when the API server is blocked by storage version updates + err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error { result, err := opts.Run() if err != nil { return err @@ -265,7 +276,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc { Client: reconciliation.RoleModifier{Client: clientset, NamespaceClient: coreclientset.Namespaces()}, Confirm: true, } - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // ServiceUnavailble error is returned when the API server is blocked by storage version updates + err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error { result, err := opts.Run() if err != nil { return err @@ -295,7 +307,8 @@ func (p *PolicyData) EnsureRBACPolicy() genericapiserver.PostStartHookFunc { Client: reconciliation.RoleBindingClientAdapter{Client: clientset, NamespaceClient: coreclientset.Namespaces()}, Confirm: true, } - err := retry.RetryOnConflict(retry.DefaultBackoff, func() error { + // ServiceUnavailble error is returned when the API server is blocked by storage version updates + err := retryOnConflictOrServiceUnavailable(retry.DefaultBackoff, func() error { result, err := opts.Run() if err != nil { return err diff --git a/pkg/registry/scheduling/rest/storage_scheduling.go b/pkg/registry/scheduling/rest/storage_scheduling.go index a20bca52748..6c1a4a58a01 100644 --- a/pkg/registry/scheduling/rest/storage_scheduling.go +++ b/pkg/registry/scheduling/rest/storage_scheduling.go @@ -128,11 +128,16 @@ func AddSystemPriorityClasses() genericapiserver.PostStartHookFunc { if err != nil { if apierrors.IsNotFound(err) { _, err := schedClientSet.PriorityClasses().Create(context.TODO(), pc, metav1.CreateOptions{}) - if err != nil && !apierrors.IsAlreadyExists(err) { - return false, err - } else { + if err == nil || apierrors.IsAlreadyExists(err) { klog.Infof("created PriorityClass %s with value %v", pc.Name, pc.Value) + continue } + // ServiceUnavailble error is returned when the API server is blocked by storage version updates + if apierrors.IsServiceUnavailable(err) { + klog.Infof("going to retry, unable to create PriorityClass %s: %v", pc.Name, err) + return false, nil + } + return false, err } else { // Unable to get the priority class for reasons other than "not found". klog.Warningf("unable to get PriorityClass %v: %v. Retrying...", pc.Name, err) From fa1805cc5cf3a13687775cda00ed555010668dca Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 24 Feb 2020 15:26:52 -0800 Subject: [PATCH 05/11] Add an integration test. To make sure that the storage version filter can block certain requests until the storage version updates are completed, and that the apiserver works properly after the storage version updates are done. --- cmd/kube-apiserver/app/testing/testserver.go | 86 +++--- .../storage_version_filter_test.go | 255 ++++++++++++++++++ .../storage_version_main_test.go | 27 ++ 3 files changed, 329 insertions(+), 39 deletions(-) create mode 100644 test/integration/storageversion/storage_version_filter_test.go create mode 100644 test/integration/storageversion/storage_version_main_test.go diff --git a/cmd/kube-apiserver/app/testing/testserver.go b/cmd/kube-apiserver/app/testing/testserver.go index 5258d2a41f8..2babadc747c 100644 --- a/cmd/kube-apiserver/app/testing/testserver.go +++ b/cmd/kube-apiserver/app/testing/testserver.go @@ -34,6 +34,7 @@ import ( "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apiserver/pkg/registry/generic/registry" "k8s.io/apiserver/pkg/storage/storagebackend" + "k8s.io/apiserver/pkg/storageversion" "k8s.io/client-go/kubernetes" restclient "k8s.io/client-go/rest" "k8s.io/client-go/util/cert" @@ -49,9 +50,10 @@ type TearDownFunc func() type TestServerInstanceOptions struct { // DisableStorageCleanup Disable the automatic storage cleanup DisableStorageCleanup bool - // Enable cert-auth for the kube-apiserver EnableCertAuth bool + // Wrap the storage version interface of the created server's generic server. + StorageVersionWrapFunc func(storageversion.Manager) storageversion.Manager } // TestServer return values supplied by kube-test-ApiServer @@ -182,6 +184,9 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo t.Logf("runtime-config=%v", completedOptions.APIEnablement.RuntimeConfig) t.Logf("Starting kube-apiserver on port %d...", s.SecureServing.BindPort) server, err := app.CreateServerChain(completedOptions, stopCh) + if instanceOptions.StorageVersionWrapFunc != nil { + server.GenericAPIServer.StorageVersionManager = instanceOptions.StorageVersionWrapFunc(server.GenericAPIServer.StorageVersionManager) + } if err != nil { return result, fmt.Errorf("failed to create server chain: %v", err) } @@ -196,51 +201,54 @@ func StartTestServer(t Logger, instanceOptions *TestServerInstanceOptions, custo } }(stopCh) - t.Logf("Waiting for /healthz to be ok...") + // skip healthz check when we test the storage version manager poststart hook + if instanceOptions.StorageVersionWrapFunc == nil { + t.Logf("Waiting for /healthz to be ok...") - client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) - if err != nil { - return result, fmt.Errorf("failed to create a client: %v", err) - } - - // wait until healthz endpoint returns ok - err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) { - select { - case err := <-errCh: - return false, err - default: + client, err := kubernetes.NewForConfig(server.GenericAPIServer.LoopbackClientConfig) + if err != nil { + return result, fmt.Errorf("failed to create a client: %v", err) } - result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()) - status := 0 - result.StatusCode(&status) - if status == 200 { - return true, nil - } - return false, nil - }) - if err != nil { - return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err) - } + // wait until healthz endpoint returns ok + err = wait.Poll(100*time.Millisecond, time.Minute, func() (bool, error) { + select { + case err := <-errCh: + return false, err + default: + } - // wait until default namespace is created - err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) { - select { - case err := <-errCh: - return false, err - default: - } - - if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil { - if !errors.IsNotFound(err) { - t.Logf("Unable to get default namespace: %v", err) + result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()) + status := 0 + result.StatusCode(&status) + if status == 200 { + return true, nil } return false, nil + }) + if err != nil { + return result, fmt.Errorf("failed to wait for /healthz to return ok: %v", err) + } + + // wait until default namespace is created + err = wait.Poll(100*time.Millisecond, 30*time.Second, func() (bool, error) { + select { + case err := <-errCh: + return false, err + default: + } + + if _, err := client.CoreV1().Namespaces().Get(context.TODO(), "default", metav1.GetOptions{}); err != nil { + if !errors.IsNotFound(err) { + t.Logf("Unable to get default namespace: %v", err) + } + return false, nil + } + return true, nil + }) + if err != nil { + return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err) } - return true, nil - }) - if err != nil { - return result, fmt.Errorf("failed to wait for default namespace to be created: %v", err) } // from here the caller must call tearDown diff --git a/test/integration/storageversion/storage_version_filter_test.go b/test/integration/storageversion/storage_version_filter_test.go new file mode 100644 index 00000000000..1bc4c64ee29 --- /dev/null +++ b/test/integration/storageversion/storage_version_filter_test.go @@ -0,0 +1,255 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "context" + "testing" + "time" + + "k8s.io/api/apiserverinternal/v1alpha1" + authenticationv1 "k8s.io/api/authentication/v1" + v1 "k8s.io/api/core/v1" + apiextensionsclientset "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" + "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + "k8s.io/apiserver/pkg/storageversion" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/dynamic" + clientset "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + featuregatetesting "k8s.io/component-base/featuregate/testing" + apiregistrationv1beta1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1" + aggregatorclient "k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/test/integration/etcd" + "k8s.io/kubernetes/test/integration/framework" +) + +type wrappedStorageVersionManager struct { + storageversion.Manager + startUpdateSV <-chan struct{} + updateFinished chan<- struct{} + finishUpdateSV <-chan struct{} + completed <-chan struct{} +} + +func (w *wrappedStorageVersionManager) UpdateStorageVersions(loopbackClientConfig *rest.Config, serverID string) { + <-w.startUpdateSV + w.Manager.UpdateStorageVersions(loopbackClientConfig, serverID) + close(w.updateFinished) + <-w.finishUpdateSV +} + +func (w *wrappedStorageVersionManager) Completed() bool { + select { + case <-w.completed: + return true + default: + return false + } +} + +func assertBlocking(name string, t *testing.T, err error, shouldBlock bool) { + if shouldBlock { + if err == nil || !errors.IsServiceUnavailable(err) { + t.Fatalf("%q should be rejected with service unavailable error, got %v", name, err) + } + } else { + if err != nil { + t.Fatalf("%q should be allowed, got %v", name, err) + } + } +} + +func testBuiltinResourceWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + client := clientset.NewForConfigOrDie(cfg) + _, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "test"}}, metav1.CreateOptions{}) + assertBlocking("writes to built in resources", t, err, shouldBlock) +} + +func testCRDWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + crdClient := apiextensionsclientset.NewForConfigOrDie(cfg) + _, err := crdClient.ApiextensionsV1beta1().CustomResourceDefinitions().Create(context.TODO(), etcd.GetCustomResourceDefinitionData()[1], metav1.CreateOptions{}) + assertBlocking("writes to CRD", t, err, shouldBlock) +} + +func testAPIServiceWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + aggregatorClient := aggregatorclient.NewForConfigOrDie(cfg) + _, err := aggregatorClient.ApiregistrationV1beta1().APIServices().Create(context.TODO(), &apiregistrationv1beta1.APIService{ + ObjectMeta: metav1.ObjectMeta{Name: "v1alpha1.wardle.example.com"}, + Spec: apiregistrationv1beta1.APIServiceSpec{ + Service: &apiregistrationv1beta1.ServiceReference{ + Namespace: "kube-wardle", + Name: "api", + }, + Group: "wardle.example.com", + Version: "v1alpha1", + GroupPriorityMinimum: 200, + VersionPriority: 200, + }, + }, metav1.CreateOptions{}) + assertBlocking("writes to APIServices", t, err, shouldBlock) +} + +func testCRWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + dynamicClient := dynamic.NewForConfigOrDie(cfg) + crclient := dynamicClient.Resource(schema.GroupVersionResource{Group: "cr.bar.com", Version: "v1", Resource: "foos"}).Namespace("default") + _, err := crclient.Create(context.TODO(), &unstructured.Unstructured{Object: map[string]interface{}{"apiVersion": "cr.bar.com/v1", "kind": "Foo", "metadata": map[string]interface{}{"generateName": "test-"}}}, metav1.CreateOptions{}) + assertBlocking("writes to CR", t, err, shouldBlock) +} + +func testStorageVersionWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + apiserverClient := clientset.NewForConfigOrDie(cfg) + _, err := apiserverClient.InternalV1alpha1().StorageVersions().Create(context.TODO(), &v1alpha1.StorageVersion{ObjectMeta: metav1.ObjectMeta{GenerateName: "test.resource"}}, metav1.CreateOptions{}) + assertBlocking("writes to Storage Version", t, err, shouldBlock) +} + +func testNonPersistedWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { + client := clientset.NewForConfigOrDie(cfg) + _, err := client.AuthenticationV1().TokenReviews().Create(context.TODO(), &authenticationv1.TokenReview{ + Spec: authenticationv1.TokenReviewSpec{ + Token: "some token", + }, + }, metav1.CreateOptions{}) + assertBlocking("non-persisted write", t, err, shouldBlock) +} + +func testBuiltinResourceRead(t *testing.T, cfg *rest.Config, shouldBlock bool) { + client := clientset.NewForConfigOrDie(cfg) + _, err := client.CoreV1().Namespaces().List(context.TODO(), metav1.ListOptions{}) + assertBlocking("reads of built-in resources", t, err, shouldBlock) +} + +// TestStorageVersionBootstrap ensures that before the StorageVersions are +// updated, only the following the request are accepted by the apiserver: +// 1. read requests +// 2. non-persisting write requests +// 3. write requests to the storageversion API +// 4. requests to CR or aggregated API +func TestStorageVersionBootstrap(t *testing.T) { + // Start server and create CRD + etcdConfig := framework.SharedEtcd() + server := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, etcdConfig) + etcd.CreateTestCRDs(t, apiextensionsclientset.NewForConfigOrDie(server.ClientConfig), false, etcd.GetCustomResourceDefinitionData()[0]) + server.TearDownFn() + + startUpdateSV := make(chan struct{}) + finishUpdateSV := make(chan struct{}) + updateFinished := make(chan struct{}) + completed := make(chan struct{}) + wrapperFunc := func(delegate storageversion.Manager) storageversion.Manager { + return &wrappedStorageVersionManager{ + startUpdateSV: startUpdateSV, + finishUpdateSV: finishUpdateSV, + updateFinished: updateFinished, + completed: completed, + Manager: delegate, + } + } + // Restart api server, enable the storage version API and the feature gates. + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + server = kubeapiservertesting.StartTestServerOrDie(t, + &kubeapiservertesting.TestServerInstanceOptions{ + StorageVersionWrapFunc: wrapperFunc, + }, + []string{ + // force enable all resources to ensure that the storage updates can handle cross group resources. + // TODO: drop these once we stop allowing them to be served. + "--runtime-config=api/all=true,extensions/v1beta1/deployments=true,extensions/v1beta1/daemonsets=true,extensions/v1beta1/replicasets=true,extensions/v1beta1/podsecuritypolicies=true,extensions/v1beta1/networkpolicies=true,internal.apiserver.k8s.io/v1alpha1=true", + }, + etcdConfig) + defer server.TearDownFn() + + cfg := rest.CopyConfig(server.ClientConfig) + + t.Run("before storage version update", func(t *testing.T) { + t.Run("write to k8s native API object should be blocked", func(t *testing.T) { + testBuiltinResourceWrite(t, cfg, true) + }) + t.Run("write to CRD should be blocked", func(t *testing.T) { + testCRDWrite(t, cfg, true) + }) + t.Run("write to APIService should be blocked", func(t *testing.T) { + testAPIServiceWrite(t, cfg, true) + }) + t.Run("write to CR should pass", func(t *testing.T) { + testCRWrite(t, cfg, false) + }) + t.Run("write to the storage version API should pass", func(t *testing.T) { + testStorageVersionWrite(t, cfg, false) + }) + t.Run("write to non-persisted API should pass", func(t *testing.T) { + testNonPersistedWrite(t, cfg, false) + }) + t.Run("read of k8s native API should pass", func(t *testing.T) { + testBuiltinResourceRead(t, cfg, false) + }) + // TODO: Write to aggregated API should pass. + }) + + // After the storage versions are updated, even though the + // StorageVersionManager.Complete() still returns false, the filter + // should not block any request. + close(startUpdateSV) + <-updateFinished + t.Run("after storage version update", func(t *testing.T) { + t.Run("write to k8s native API object should pass", func(t *testing.T) { + testBuiltinResourceWrite(t, cfg, false) + }) + t.Run("write to CRD should pass", func(t *testing.T) { + testCRDWrite(t, cfg, false) + }) + t.Run("write to APIService should pass", func(t *testing.T) { + testAPIServiceWrite(t, cfg, false) + }) + t.Run("write to the storage version API should pass", func(t *testing.T) { + testStorageVersionWrite(t, cfg, false) + }) + t.Run("write to non-persisted API should pass", func(t *testing.T) { + testNonPersistedWrite(t, cfg, false) + }) + t.Run("read of k8s native API should pass", func(t *testing.T) { + testBuiltinResourceRead(t, cfg, false) + }) + }) + + // After the StorageVersionManager.Complete() returns true, the server should become healthy. + close(completed) + close(finishUpdateSV) + t.Run("after storage version manager complete", func(t *testing.T) { + // wait until healthz endpoint returns ok + client := clientset.NewForConfigOrDie(cfg) + err := wait.Poll(100*time.Millisecond, 10*time.Second, func() (bool, error) { + result := client.CoreV1().RESTClient().Get().AbsPath("/healthz").Do(context.TODO()) + status := 0 + result.StatusCode(&status) + if status == 200 { + return true, nil + } + return false, nil + }) + if err != nil { + t.Errorf("failed to wait for /healthz to return ok: %v", err) + } + }) +} diff --git a/test/integration/storageversion/storage_version_main_test.go b/test/integration/storageversion/storage_version_main_test.go new file mode 100644 index 00000000000..4aca4618db7 --- /dev/null +++ b/test/integration/storageversion/storage_version_main_test.go @@ -0,0 +1,27 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "testing" + + "k8s.io/kubernetes/test/integration/framework" +) + +func TestMain(m *testing.M) { + framework.EtcdMain(m.Run) +} From b5b93004b59295267dff5c7fa68eb2e8877ca3a2 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Mon, 10 Aug 2020 23:05:01 -0700 Subject: [PATCH 06/11] generated --- cmd/kube-apiserver/app/testing/BUILD | 1 + .../src/k8s.io/apiserver/pkg/endpoints/BUILD | 1 + .../apiserver/pkg/endpoints/filters/BUILD | 3 ++ staging/src/k8s.io/apiserver/pkg/server/BUILD | 1 + .../k8s.io/apiserver/pkg/storageversion/BUILD | 6 ++- test/integration/BUILD | 1 + test/integration/storageversion/BUILD | 47 +++++++++++++++++++ vendor/modules.txt | 1 + 8 files changed, 60 insertions(+), 1 deletion(-) create mode 100644 test/integration/storageversion/BUILD diff --git a/cmd/kube-apiserver/app/testing/BUILD b/cmd/kube-apiserver/app/testing/BUILD index cc267245a6c..baacf9a93a8 100644 --- a/cmd/kube-apiserver/app/testing/BUILD +++ b/cmd/kube-apiserver/app/testing/BUILD @@ -18,6 +18,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/generic/registry:go_default_library", "//staging/src/k8s.io/apiserver/pkg/storage/storagebackend:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/client-go/util/cert:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD index 640e8999840..9cef55b16ea 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/BUILD @@ -90,6 +90,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/warning:go_default_library", "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/registry/rest:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/component-base/version:go_default_library", "//vendor/github.com/emicklei/go-restful:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/BUILD b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/BUILD index 6410c8303a6..b03d79a2a55 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/BUILD @@ -58,6 +58,7 @@ go_library( "metrics.go", "request_received_time.go", "requestinfo.go", + "storageversion.go", "warning.go", ], importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/endpoints/filters", @@ -82,9 +83,11 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters:go_default_library", "//staging/src/k8s.io/apiserver/pkg/endpoints/request:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/httplog:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library", "//staging/src/k8s.io/apiserver/pkg/warning:go_default_library", "//staging/src/k8s.io/component-base/metrics:go_default_library", "//staging/src/k8s.io/component-base/metrics/legacyregistry:go_default_library", + "//staging/src/k8s.io/component-base/metrics/prometheus/workqueue:go_default_library", "//vendor/k8s.io/klog/v2:go_default_library", ], ) diff --git a/staging/src/k8s.io/apiserver/pkg/server/BUILD b/staging/src/k8s.io/apiserver/pkg/server/BUILD index c63c7fafba7..39e7481ed7f 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/server/BUILD @@ -117,6 +117,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/server/mux:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/routes:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/flowcontrol:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/openapi:go_default_library", diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD b/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD index 98cc5837204..247a5156958 100644 --- a/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD @@ -25,10 +25,14 @@ go_library( go_test( name = "go_default_test", - srcs = ["updater_test.go"], + srcs = [ + "manager_test.go", + "updater_test.go", + ], embed = [":go_default_library"], deps = [ "//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", "//vendor/github.com/google/go-cmp/cmp:go_default_library", ], ) diff --git a/test/integration/BUILD b/test/integration/BUILD index 901d12ef8dc..2b77bb78b94 100644 --- a/test/integration/BUILD +++ b/test/integration/BUILD @@ -79,6 +79,7 @@ filegroup( "//test/integration/serving:all-srcs", "//test/integration/statefulset:all-srcs", "//test/integration/storageclasses:all-srcs", + "//test/integration/storageversion:all-srcs", "//test/integration/tls:all-srcs", "//test/integration/ttlcontroller:all-srcs", "//test/integration/util:all-srcs", diff --git a/test/integration/storageversion/BUILD b/test/integration/storageversion/BUILD new file mode 100644 index 00000000000..a9336acb52d --- /dev/null +++ b/test/integration/storageversion/BUILD @@ -0,0 +1,47 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_test") + +go_test( + name = "go_default_test", + srcs = [ + "storage_version_filter_test.go", + "storage_version_main_test.go", + ], + tags = ["integration"], + deps = [ + "//cmd/kube-apiserver/app/testing:go_default_library", + "//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/authentication/v1:go_default_library", + "//staging/src/k8s.io/api/core/v1:go_default_library", + "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1/unstructured:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", + "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", + "//staging/src/k8s.io/kube-aggregator/pkg/apis/apiregistration/v1beta1:go_default_library", + "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset:go_default_library", + "//test/integration/etcd:go_default_library", + "//test/integration/framework:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/vendor/modules.txt b/vendor/modules.txt index 1ae0a154465..d4fb9967cd8 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1860,6 +1860,7 @@ k8s.io/apiserver/pkg/storage/value/encrypt/envelope/testing k8s.io/apiserver/pkg/storage/value/encrypt/envelope/v1beta1 k8s.io/apiserver/pkg/storage/value/encrypt/identity k8s.io/apiserver/pkg/storage/value/encrypt/secretbox +k8s.io/apiserver/pkg/storageversion k8s.io/apiserver/pkg/util/apihelpers k8s.io/apiserver/pkg/util/dryrun k8s.io/apiserver/pkg/util/feature From 8a1d8f7fd58ee7eaa9a8985048a56e02c9ad40c1 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Wed, 4 Nov 2020 22:29:52 -0800 Subject: [PATCH 07/11] return a Status formatted JSON response --- .../apiserver/pkg/endpoints/filters/storageversion.go | 7 +++++-- .../pkg/endpoints/handlers/responsewriters/errors.go | 7 ------- staging/src/k8s.io/apiserver/pkg/server/config.go | 2 +- 3 files changed, 6 insertions(+), 10 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go index 9b164957c84..265cded2ec9 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go @@ -21,6 +21,8 @@ import ( "fmt" "net/http" + apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" @@ -35,7 +37,7 @@ import ( // 2. read requests, // 3. write requests to the storageversion API, // 4. resources whose StorageVersion is not pending update, including non-persisted resources. -func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager) http.Handler { +func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager, s runtime.NegotiatedSerializer) http.Handler { if svm == nil { // TODO(roycaihw): switch to warning after the feature graduate to beta/GA klog.V(2).Infof("Storage Version barrier is disabled") @@ -75,6 +77,7 @@ func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Man return } - responsewriters.ServiceUnavailabeError(w, req, errors.New(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr)))) + gv := schema.GroupVersion{requestInfo.APIGroup, requestInfo.APIVersion} + responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr))), s, gv, w, req) }) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go index ea7387537f6..d13bee4d223 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/handlers/responsewriters/errors.go @@ -76,10 +76,3 @@ func InternalError(w http.ResponseWriter, req *http.Request, err error) { http.StatusInternalServerError) utilruntime.HandleError(err) } - -// ServiceUnavailabeError renders a simple internal error -func ServiceUnavailabeError(w http.ResponseWriter, req *http.Request, err error) { - http.Error(w, sanitizer.Replace(fmt.Sprintf("Service Unavailable: %q: %v", req.RequestURI, err)), - http.StatusServiceUnavailable) - utilruntime.HandleError(err) -} diff --git a/staging/src/k8s.io/apiserver/pkg/server/config.go b/staging/src/k8s.io/apiserver/pkg/server/config.go index c14759f419d..172a341cac7 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/config.go +++ b/staging/src/k8s.io/apiserver/pkg/server/config.go @@ -712,7 +712,7 @@ func (c completedConfig) New(name string, delegationTarget DelegationTarget) (*G func BuildHandlerChainWithStorageVersionPrecondition(apiHandler http.Handler, c *Config) http.Handler { // WithStorageVersionPrecondition needs the WithRequestInfo to run first - handler := genericapifilters.WithStorageVersionPrecondition(apiHandler, c.StorageVersionManager) + handler := genericapifilters.WithStorageVersionPrecondition(apiHandler, c.StorageVersionManager, c.Serializer) return DefaultBuildHandlerChain(handler, c) } From 23ef9b51a8fe4d64c20ada99cfead90447a6ffc2 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Sun, 8 Nov 2020 18:52:05 -0800 Subject: [PATCH 08/11] updater correctly updates storageversion status --- .../k8s.io/apiserver/pkg/storageversion/updater.go | 12 +++++++++--- 1 file changed, 9 insertions(+), 3 deletions(-) diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go index 110accf22da..10927fb0f03 100644 --- a/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go @@ -31,7 +31,7 @@ import ( // Client has the methods required to update the storage version. type Client interface { Create(context.Context, *v1alpha1.StorageVersion, metav1.CreateOptions) (*v1alpha1.StorageVersion, error) - Update(context.Context, *v1alpha1.StorageVersion, metav1.UpdateOptions) (*v1alpha1.StorageVersion, error) + UpdateStatus(context.Context, *v1alpha1.StorageVersion, metav1.UpdateOptions) (*v1alpha1.StorageVersion, error) Get(context.Context, string, metav1.GetOptions) (*v1alpha1.StorageVersion, error) } @@ -91,10 +91,16 @@ func singleUpdate(c Client, apiserverID string, gr schema.GroupResource, encodin } updatedSV := localUpdateStorageVersion(sv, apiserverID, encodingVersion, decodableVersions) if shouldCreate { - _, err := c.Create(context.TODO(), updatedSV, metav1.CreateOptions{}) + createdSV, err := c.Create(context.TODO(), updatedSV, metav1.CreateOptions{}) + if err != nil { + return err + } + // assign the calculated status to the object just created, then update status + createdSV.Status = updatedSV.Status + _, err = c.UpdateStatus(context.TODO(), createdSV, metav1.UpdateOptions{}) return err } - _, err = c.Update(context.TODO(), updatedSV, metav1.UpdateOptions{}) + _, err = c.UpdateStatus(context.TODO(), updatedSV, metav1.UpdateOptions{}) return err } From 7bcd25907a49c4641843457f1eef3542f82386bb Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Sun, 8 Nov 2020 18:52:29 -0800 Subject: [PATCH 09/11] apiserver correctly validates encoding/decodable versions --- .../validation/validation.go | 47 +++++++++++++++- .../validation/validation_test.go | 56 +++++++++++++++++++ 2 files changed, 101 insertions(+), 2 deletions(-) diff --git a/pkg/apis/apiserverinternal/validation/validation.go b/pkg/apis/apiserverinternal/validation/validation.go index 6f2e73c91fe..928ffec7bb0 100644 --- a/pkg/apis/apiserverinternal/validation/validation.go +++ b/pkg/apis/apiserverinternal/validation/validation.go @@ -83,13 +83,13 @@ func validateServerStorageVersion(ssv apiserverinternal.ServerStorageVersion, fl for _, msg := range apimachineryvalidation.NameIsDNSSubdomain(ssv.APIServerID, false) { allErrs = append(allErrs, field.Invalid(fldPath.Child("apiServerID"), ssv.APIServerID, msg)) } - if errs := utilvalidation.IsDNS1035Label(ssv.EncodingVersion); len(errs) > 0 { + if errs := isValidAPIVersion(ssv.EncodingVersion); len(errs) > 0 { allErrs = append(allErrs, field.Invalid(fldPath.Child("encodingVersion"), ssv.EncodingVersion, strings.Join(errs, ","))) } found := false for i, dv := range ssv.DecodableVersions { - if errs := utilvalidation.IsDNS1035Label(dv); len(errs) > 0 { + if errs := isValidAPIVersion(dv); len(errs) > 0 { allErrs = append(allErrs, field.Invalid(fldPath.Child("decodableVersions").Index(i), dv, strings.Join(errs, ","))) } if dv == ssv.EncodingVersion { @@ -158,3 +158,46 @@ func validateStorageVersionCondition(conditions []apiserverinternal.StorageVersi } return allErrs } + +const dns1035LabelFmt string = "[a-z]([-a-z0-9]*[a-z0-9])?" +const dns1035LabelErrMsg string = "a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character" + +// isValidAPIVersion tests whether the value passed is a valid apiVersion. A +// valid apiVersion contains a version string that matches DNS_LABEL format, +// with an optional group/ prefix, where the group string matches DNS_SUBDOMAIN +// format. If the value is not valid, a list of error strings is returned. +// Otherwise an empty list (or nil) is returned. +func isValidAPIVersion(apiVersion string) []string { + var errs []string + parts := strings.Split(apiVersion, "/") + var version string + switch len(parts) { + case 1: + version = parts[0] + case 2: + var group string + group, version = parts[0], parts[1] + if len(group) == 0 { + errs = append(errs, "group part "+utilvalidation.EmptyError()) + } else if msgs := utilvalidation.IsDNS1123Subdomain(group); len(msgs) != 0 { + errs = append(errs, prefixEach(msgs, "group part ")...) + } + default: + return append(errs, "an apiVersion "+utilvalidation.RegexError(dns1035LabelErrMsg, dns1035LabelFmt, "my-name", "abc-123")+ + " with an optional DNS subdomain prefix and '/' (e.g. 'example.com/MyVersion')") + } + + if len(version) == 0 { + errs = append(errs, "version part "+utilvalidation.EmptyError()) + } else if msgs := utilvalidation.IsDNS1035Label(version); len(msgs) != 0 { + errs = append(errs, prefixEach(msgs, "version part ")...) + } + return errs +} + +func prefixEach(msgs []string, prefix string) []string { + for i := range msgs { + msgs[i] = prefix + msgs[i] + } + return msgs +} diff --git a/pkg/apis/apiserverinternal/validation/validation_test.go b/pkg/apis/apiserverinternal/validation/validation_test.go index 87b8cd7fe80..fcaded290ee 100644 --- a/pkg/apis/apiserverinternal/validation/validation_test.go +++ b/pkg/apis/apiserverinternal/validation/validation_test.go @@ -61,6 +61,62 @@ func TestValidateServerStorageVersion(t *testing.T) { }, expectedErr: "", }, + { + ssv: apiserverinternal.ServerStorageVersion{ + APIServerID: "fea", + EncodingVersion: "mygroup.com/v2", + DecodableVersions: []string{"v1alpha1", "v1", "mygroup.com/v2"}, + }, + expectedErr: "", + }, + { + ssv: apiserverinternal.ServerStorageVersion{ + APIServerID: "fea", + EncodingVersion: "mygroup.com/v2", + DecodableVersions: []string{"mygroup.com/v2", "/v3"}, + }, + expectedErr: `[].decodableVersions[1]: Invalid value: "/v3": group part must be non-empty`, + }, + { + ssv: apiserverinternal.ServerStorageVersion{ + APIServerID: "fea", + EncodingVersion: "mygroup.com/v2", + DecodableVersions: []string{"mygroup.com/v2", "mygroup.com/"}, + }, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/": version part must be non-empty`, + }, + { + ssv: apiserverinternal.ServerStorageVersion{ + APIServerID: "fea", + EncodingVersion: "/v3", + DecodableVersions: []string{"mygroup.com/v2", "/v3"}, + }, + expectedErr: `[].encodingVersion: Invalid value: "/v3": group part must be non-empty`, + }, + { + ssv: apiserverinternal.ServerStorageVersion{ + APIServerID: "fea", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "mygroup_com/v2"}, + }, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup_com/v2": group part a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. 'example.com', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*')`, + }, + { + ssv: apiserverinternal.ServerStorageVersion{ + APIServerID: "fea", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "mygroup.com/v2_"}, + }, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2_": version part a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?')`, + }, + { + ssv: apiserverinternal.ServerStorageVersion{ + APIServerID: "fea", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "mygroup.com/v2/myresource"}, + }, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2/myresource": an apiVersion a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?') with an optional DNS subdomain prefix and '/' (e.g. 'example.com/MyVersion')`, + }, } for _, tc := range cases { From 1c2d446648662529282a3bb1528a6dbb50700fdb Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Sun, 8 Nov 2020 19:06:30 -0800 Subject: [PATCH 10/11] require APIServerIdentity to be enabled to run StorageVersionAPI without APIServerIdentity enabled, stale apiserver leases won't be GC'ed and the same for stale storage version entries. In that case the storage migrator won't operate correctly without manual intervention. --- cmd/kube-apiserver/app/aggregator.go | 3 ++- staging/src/k8s.io/apiserver/pkg/endpoints/installer.go | 1 + staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go | 3 ++- staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go | 3 ++- 4 files changed, 7 insertions(+), 3 deletions(-) diff --git a/cmd/kube-apiserver/app/aggregator.go b/cmd/kube-apiserver/app/aggregator.go index 5e72a59f78a..c539cf235a4 100644 --- a/cmd/kube-apiserver/app/aggregator.go +++ b/cmd/kube-apiserver/app/aggregator.go @@ -68,7 +68,8 @@ func createAggregatorConfig( genericConfig.PostStartHooks = map[string]genericapiserver.PostStartHookConfigEntry{} genericConfig.RESTOptionsGetter = nil - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { // Add StorageVersionPrecondition handler to aggregator-apiserver. // The handler will block write requests to built-in resources until the // target resources' storage versions are up-to-date. diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index b49a090aa2f..6549771ced8 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -513,6 +513,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag var resourceInfo *storageversion.ResourceInfo if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && + utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) && isStorageVersionProvider && storageVersionProvider.StorageVersion() != nil { diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index d0f42684421..d7d60b213de 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -443,7 +443,8 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A resourceInfos = append(resourceInfos, r...) } - if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) { + if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && + utilfeature.DefaultFeatureGate.Enabled(features.APIServerIdentity) { // API installation happens before we start listening on the handlers, // therefore it is safe to register ResourceInfos here. The handler will block // write requests until the storage versions of the targeting resources are updated. diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 30d187ec45d..59baa74c37f 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -267,7 +267,8 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg return nil }) - if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) && + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { // Spawn a goroutine in aggregator apiserver to update storage version for // all built-in resources s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(context genericapiserver.PostStartHookContext) error { From 23f77ce7c62db3761580d06622fbfb1cedaa1293 Mon Sep 17 00:00:00 2001 From: Haowei Cai Date: Mon, 9 Nov 2020 15:23:13 -0800 Subject: [PATCH 11/11] fixup! apiserver correctly validates encoding/decodable versions --- pkg/apis/apiserverinternal/validation/validation.go | 12 ++++++------ .../apiserverinternal/validation/validation_test.go | 12 ++++++------ 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/pkg/apis/apiserverinternal/validation/validation.go b/pkg/apis/apiserverinternal/validation/validation.go index 928ffec7bb0..3b82baf690f 100644 --- a/pkg/apis/apiserverinternal/validation/validation.go +++ b/pkg/apis/apiserverinternal/validation/validation.go @@ -160,7 +160,7 @@ func validateStorageVersionCondition(conditions []apiserverinternal.StorageVersi } const dns1035LabelFmt string = "[a-z]([-a-z0-9]*[a-z0-9])?" -const dns1035LabelErrMsg string = "a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character" +const dns1035LabelErrMsg string = "a DNS-1035 label, which must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character" // isValidAPIVersion tests whether the value passed is a valid apiVersion. A // valid apiVersion contains a version string that matches DNS_LABEL format, @@ -178,19 +178,19 @@ func isValidAPIVersion(apiVersion string) []string { var group string group, version = parts[0], parts[1] if len(group) == 0 { - errs = append(errs, "group part "+utilvalidation.EmptyError()) + errs = append(errs, "group part: "+utilvalidation.EmptyError()) } else if msgs := utilvalidation.IsDNS1123Subdomain(group); len(msgs) != 0 { - errs = append(errs, prefixEach(msgs, "group part ")...) + errs = append(errs, prefixEach(msgs, "group part: ")...) } default: - return append(errs, "an apiVersion "+utilvalidation.RegexError(dns1035LabelErrMsg, dns1035LabelFmt, "my-name", "abc-123")+ + return append(errs, "an apiVersion is "+utilvalidation.RegexError(dns1035LabelErrMsg, dns1035LabelFmt, "my-name", "abc-123")+ " with an optional DNS subdomain prefix and '/' (e.g. 'example.com/MyVersion')") } if len(version) == 0 { - errs = append(errs, "version part "+utilvalidation.EmptyError()) + errs = append(errs, "version part: "+utilvalidation.EmptyError()) } else if msgs := utilvalidation.IsDNS1035Label(version); len(msgs) != 0 { - errs = append(errs, prefixEach(msgs, "version part ")...) + errs = append(errs, prefixEach(msgs, "version part: ")...) } return errs } diff --git a/pkg/apis/apiserverinternal/validation/validation_test.go b/pkg/apis/apiserverinternal/validation/validation_test.go index fcaded290ee..0d0b3bae893 100644 --- a/pkg/apis/apiserverinternal/validation/validation_test.go +++ b/pkg/apis/apiserverinternal/validation/validation_test.go @@ -75,7 +75,7 @@ func TestValidateServerStorageVersion(t *testing.T) { EncodingVersion: "mygroup.com/v2", DecodableVersions: []string{"mygroup.com/v2", "/v3"}, }, - expectedErr: `[].decodableVersions[1]: Invalid value: "/v3": group part must be non-empty`, + expectedErr: `[].decodableVersions[1]: Invalid value: "/v3": group part: must be non-empty`, }, { ssv: apiserverinternal.ServerStorageVersion{ @@ -83,7 +83,7 @@ func TestValidateServerStorageVersion(t *testing.T) { EncodingVersion: "mygroup.com/v2", DecodableVersions: []string{"mygroup.com/v2", "mygroup.com/"}, }, - expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/": version part must be non-empty`, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/": version part: must be non-empty`, }, { ssv: apiserverinternal.ServerStorageVersion{ @@ -91,7 +91,7 @@ func TestValidateServerStorageVersion(t *testing.T) { EncodingVersion: "/v3", DecodableVersions: []string{"mygroup.com/v2", "/v3"}, }, - expectedErr: `[].encodingVersion: Invalid value: "/v3": group part must be non-empty`, + expectedErr: `[].encodingVersion: Invalid value: "/v3": group part: must be non-empty`, }, { ssv: apiserverinternal.ServerStorageVersion{ @@ -99,7 +99,7 @@ func TestValidateServerStorageVersion(t *testing.T) { EncodingVersion: "v1", DecodableVersions: []string{"v1", "mygroup_com/v2"}, }, - expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup_com/v2": group part a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. 'example.com', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*')`, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup_com/v2": group part: a lowercase RFC 1123 subdomain must consist of lower case alphanumeric characters, '-' or '.', and must start and end with an alphanumeric character (e.g. 'example.com', regex used for validation is '[a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*')`, }, { ssv: apiserverinternal.ServerStorageVersion{ @@ -107,7 +107,7 @@ func TestValidateServerStorageVersion(t *testing.T) { EncodingVersion: "v1", DecodableVersions: []string{"v1", "mygroup.com/v2_"}, }, - expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2_": version part a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?')`, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2_": version part: a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?')`, }, { ssv: apiserverinternal.ServerStorageVersion{ @@ -115,7 +115,7 @@ func TestValidateServerStorageVersion(t *testing.T) { EncodingVersion: "v1", DecodableVersions: []string{"v1", "mygroup.com/v2/myresource"}, }, - expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2/myresource": an apiVersion a DNS-1035 label must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?') with an optional DNS subdomain prefix and '/' (e.g. 'example.com/MyVersion')`, + expectedErr: `[].decodableVersions[1]: Invalid value: "mygroup.com/v2/myresource": an apiVersion is a DNS-1035 label, which must consist of lower case alphanumeric characters or '-', start with an alphabetic character, and end with an alphanumeric character (e.g. 'my-name', or 'abc-123', regex used for validation is '[a-z]([-a-z0-9]*[a-z0-9])?') with an optional DNS subdomain prefix and '/' (e.g. 'example.com/MyVersion')`, }, }