From 369475681679e1cb68c23c3355542599a3c0e1e3 Mon Sep 17 00:00:00 2001 From: Chao Xu Date: Mon, 24 Feb 2020 15:35:14 -0800 Subject: [PATCH] Collect storage versions as ResourceInfo when installing API endpoints. Co-authored-by: Haowei Cai --- .../apiserver/pkg/endpoints/apiserver_test.go | 12 +- .../apiserver/pkg/endpoints/groupversion.go | 20 ++- .../apiserver/pkg/endpoints/installer.go | 118 +++++++++++------- .../apiserver/pkg/server/genericapiserver.go | 16 ++- .../apiserver/pkg/storageversion/manager.go | 72 +++++++++-- .../pkg/storageversion/manager_test.go | 62 +++++++++ 6 files changed, 238 insertions(+), 62 deletions(-) create mode 100644 staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go index 15786c002d0..f0da9ede17c 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/apiserver_test.go @@ -254,7 +254,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = grouplessGroupVersion group.OptionsExternalVersion = &grouplessGroupVersion group.Serializer = codecs - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -266,7 +266,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = testGroupVersion group.OptionsExternalVersion = &testGroupVersion group.Serializer = codecs - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -278,7 +278,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission. group.GroupVersion = newGroupVersion group.OptionsExternalVersion = &newGroupVersion group.Serializer = codecs - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } } @@ -3678,7 +3678,7 @@ func TestParentResourceIsRequired(t *testing.T) { ParameterCodec: parameterCodec, } container := restful.NewContainer() - if err := group.InstallREST(container); err == nil { + if _, err := group.InstallREST(container); err == nil { t.Fatal("expected error") } @@ -3710,7 +3710,7 @@ func TestParentResourceIsRequired(t *testing.T) { ParameterCodec: parameterCodec, } container = restful.NewContainer() - if err := group.InstallREST(container); err != nil { + if _, err := group.InstallREST(container); err != nil { t.Fatal(err) } @@ -4566,7 +4566,7 @@ func TestXGSubresource(t *testing.T) { Serializer: codecs, } - if err := (&group).InstallREST(container); err != nil { + if _, err := (&group).InstallREST(container); err != nil { panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err)) } diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go index e9c57f18e3a..22b97366142 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/groupversion.go @@ -32,6 +32,7 @@ import ( "k8s.io/apiserver/pkg/endpoints/discovery" "k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storageversion" openapiproto "k8s.io/kube-openapi/pkg/util/proto" ) @@ -96,7 +97,7 @@ type APIGroupVersion struct { // InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container. // It is expected that the provided path root prefix will serve all operations. Root MUST NOT end // in a slash. -func (g *APIGroupVersion) InstallREST(container *restful.Container) error { +func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) { prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version) installer := &APIInstaller{ group: g, @@ -104,11 +105,24 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) error { minRequestTimeout: g.MinRequestTimeout, } - apiResources, ws, registrationErrors := installer.Install() + apiResources, resourceInfos, ws, registrationErrors := installer.Install() versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources}) versionDiscoveryHandler.AddToWebService(ws) container.Add(ws) - return utilerrors.NewAggregate(registrationErrors) + return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors) +} + +func removeNonPersistedResources(infos []*storageversion.ResourceInfo) []*storageversion.ResourceInfo { + var filtered []*storageversion.ResourceInfo + for _, info := range infos { + // if EncodingVersion is empty, then the apiserver does not + // need to register this resource via the storage version API, + // thus we can remove it. + if info != nil && len(info.EncodingVersion) > 0 { + filtered = append(filtered, info) + } + } + return filtered } // staticLister implements the APIResourceLister interface diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go index cabae0c464c..b49a090aa2f 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/installer.go @@ -43,6 +43,7 @@ import ( utilwarning "k8s.io/apiserver/pkg/endpoints/warning" "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/registry/rest" + "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" versioninfo "k8s.io/component-base/version" ) @@ -94,8 +95,9 @@ var toDiscoveryKubeVerb = map[string]string{ } // Install handlers for API resources. -func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) { +func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) { var apiResources []metav1.APIResource + var resourceInfos []*storageversion.ResourceInfo var errors []error ws := a.newWebService() @@ -108,15 +110,18 @@ func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []e } sort.Strings(paths) for _, path := range paths { - apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws) + apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws) if err != nil { errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err)) } if apiResource != nil { apiResources = append(apiResources, *apiResource) } + if resourceInfo != nil { + resourceInfos = append(resourceInfos, resourceInfo) + } } - return apiResources, ws, errors + return apiResources, resourceInfos, ws, errors } // newWebService creates a new restful webservice with the api installer's prefix and version. @@ -182,7 +187,7 @@ func GetResourceKind(groupVersion schema.GroupVersion, storage rest.Storage, typ return fqKindToRegister, nil } -func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) { +func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) { admit := a.group.Admit optionsExternalVersion := a.group.GroupVersion @@ -192,19 +197,19 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag resource, subresource, err := splitSubresource(path) if err != nil { - return nil, err + return nil, nil, err } group, version := a.group.GroupVersion.Group, a.group.GroupVersion.Version fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer) if err != nil { - return nil, err + return nil, nil, err } versionedPtr, err := a.group.Creater.New(fqKindToRegister) if err != nil { - return nil, err + return nil, nil, err } defaultVersionedObject := indirectArbitraryPointer(versionedPtr) kind := fqKindToRegister.Kind @@ -215,18 +220,18 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isSubresource { parentStorage, ok := a.group.Storage[resource] if !ok { - return nil, fmt.Errorf("missing parent storage: %q", resource) + return nil, nil, fmt.Errorf("missing parent storage: %q", resource) } scoper, ok := parentStorage.(rest.Scoper) if !ok { - return nil, fmt.Errorf("%q must implement scoper", resource) + return nil, nil, fmt.Errorf("%q must implement scoper", resource) } namespaceScoped = scoper.NamespaceScoped() } else { scoper, ok := storage.(rest.Scoper) if !ok { - return nil, fmt.Errorf("%q must implement scoper", resource) + return nil, nil, fmt.Errorf("%q must implement scoper", resource) } namespaceScoped = scoper.NamespaceScoped() } @@ -255,7 +260,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag versionedExportOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("ExportOptions")) if err != nil { - return nil, err + return nil, nil, err } if isNamedCreater { @@ -267,30 +272,30 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag list := lister.NewList() listGVKs, _, err := a.group.Typer.ObjectKinds(list) if err != nil { - return nil, err + return nil, nil, err } versionedListPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind(listGVKs[0].Kind)) if err != nil { - return nil, err + return nil, nil, err } versionedList = indirectArbitraryPointer(versionedListPtr) } versionedListOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("ListOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedCreateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("CreateOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedPatchOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("PatchOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedUpdateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("UpdateOptions")) if err != nil { - return nil, err + return nil, nil, err } var versionedDeleteOptions runtime.Object @@ -299,7 +304,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isGracefulDeleter { versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions")) if err != nil { - return nil, err + return nil, nil, err } versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions) @@ -310,7 +315,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag versionedStatusPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("Status")) if err != nil { - return nil, err + return nil, nil, err } versionedStatus := indirectArbitraryPointer(versionedStatusPtr) var ( @@ -323,14 +328,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag getOptions, getSubpath, _ = getterWithOptions.NewGetOptions() getOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(getOptions) if err != nil { - return nil, err + return nil, nil, err } getOptionsInternalKind = getOptionsInternalKinds[0] versionedGetOptions, err = a.group.Creater.New(a.group.GroupVersion.WithKind(getOptionsInternalKind.Kind)) if err != nil { versionedGetOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(getOptionsInternalKind.Kind)) if err != nil { - return nil, err + return nil, nil, err } } isGetter = true @@ -340,7 +345,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isWatcher { versionedWatchEventPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind("WatchEvent")) if err != nil { - return nil, err + return nil, nil, err } versionedWatchEvent = indirectArbitraryPointer(versionedWatchEventPtr) } @@ -356,7 +361,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if connectOptions != nil { connectOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(connectOptions) if err != nil { - return nil, err + return nil, nil, err } connectOptionsInternalKind = connectOptionsInternalKinds[0] @@ -364,7 +369,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if err != nil { versionedConnectOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(connectOptionsInternalKind.Kind)) if err != nil { - return nil, err + return nil, nil, err } } } @@ -388,7 +393,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag tableProvider, isTableProvider := storage.(rest.TableConvertor) if isLister && !isTableProvider { // All listers must implement TableProvider - return nil, fmt.Errorf("%q must implement TableConvertor", resource) + return nil, nil, fmt.Errorf("%q must implement TableConvertor", resource) } var apiResource metav1.APIResource @@ -398,7 +403,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag versioner := storageVersionProvider.StorageVersion() gvk, err := getStorageVersionKind(versioner, storage, a.group.Typer) if err != nil { - return nil, err + return nil, nil, err } apiResource.StorageVersionHash = discovery.StorageVersionHash(gvk.Group, gvk.Version, gvk.Kind) } @@ -506,6 +511,29 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } } + var resourceInfo *storageversion.ResourceInfo + if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) && + isStorageVersionProvider && + storageVersionProvider.StorageVersion() != nil { + + versioner := storageVersionProvider.StorageVersion() + encodingGVK, err := getStorageVersionKind(versioner, storage, a.group.Typer) + if err != nil { + return nil, nil, err + } + resourceInfo = &storageversion.ResourceInfo{ + GroupResource: schema.GroupResource{ + Group: a.group.GroupVersion.Group, + Resource: apiResource.Name, + }, + EncodingVersion: encodingGVK.GroupVersion().String(), + // We record EquivalentResourceMapper first instead of calculate + // DecodableVersions immediately because API installation must + // be completed first for us to know equivalent APIs + EquivalentResourceMapper: a.group.EquivalentResourceRegistry, + } + } + // Create Routes for the actions. // TODO: Add status documentation using Returns() // Errors (see api/errors/errors.go as well as go-restful router): @@ -525,7 +553,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag for _, s := range a.group.Serializer.SupportedMediaTypes() { if len(s.MediaTypeSubType) == 0 || len(s.MediaTypeType) == 0 { - return nil, fmt.Errorf("all serializers in the group Serializer must have MediaTypeType and MediaTypeSubType set: %s", s.MediaType) + return nil, nil, fmt.Errorf("all serializers in the group Serializer must have MediaTypeType and MediaTypeSubType set: %s", s.MediaType) } } mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer) @@ -573,7 +601,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag isSubresource, ) if err != nil { - return nil, fmt.Errorf("failed to create field manager: %v", err) + return nil, nil, fmt.Errorf("failed to create field manager: %v", err) } } for _, action := range actions { @@ -608,7 +636,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag kubeVerbs[kubeVerb] = struct{}{} } } else { - return nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb) + return nil, nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb) } routes := []*restful.RouteBuilder{} @@ -617,12 +645,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag if isSubresource { parentStorage, ok := a.group.Storage[resource] if !ok { - return nil, fmt.Errorf("missing parent storage: %q", resource) + return nil, nil, fmt.Errorf("missing parent storage: %q", resource) } fqParentKind, err := GetResourceKind(a.group.GroupVersion, parentStorage, a.group.Typer) if err != nil { - return nil, err + return nil, nil, err } kind = fqParentKind.Kind } @@ -681,12 +709,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Writes(producedObject) if isGetterWithOptions { if err := AddObjectParams(ws, route, versionedGetOptions); err != nil { - return nil, err + return nil, nil, err } } if isExporter { if err := AddObjectParams(ws, route, versionedExportOptions); err != nil { - return nil, err + return nil, nil, err } } addParams(route, action.Params) @@ -708,7 +736,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Returns(http.StatusOK, "OK", versionedList). Writes(versionedList) if err := AddObjectParams(ws, route, versionedListOptions); err != nil { - return nil, err + return nil, nil, err } switch { case isLister && isWatcher: @@ -747,7 +775,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Reads(defaultVersionedObject). Writes(producedObject) if err := AddObjectParams(ws, route, versionedUpdateOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -778,7 +806,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Reads(metav1.Patch{}). Writes(producedObject) if err := AddObjectParams(ws, route, versionedPatchOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -811,7 +839,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Reads(defaultVersionedObject). Writes(producedObject) if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -841,7 +869,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag route.Reads(versionedDeleterObject) route.ParameterNamed("body").Required(false) if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil { - return nil, err + return nil, nil, err } } addParams(route, action.Params) @@ -866,11 +894,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag route.Reads(versionedDeleterObject) route.ParameterNamed("body").Required(false) if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil { - return nil, err + return nil, nil, err } } if err := AddObjectParams(ws, route, versionedListOptions, "watch", "allowWatchBookmarks"); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -893,7 +921,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Returns(http.StatusOK, "OK", versionedWatchEvent). Writes(versionedWatchEvent) if err := AddObjectParams(ws, route, versionedListOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -916,7 +944,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Returns(http.StatusOK, "OK", versionedWatchEvent). Writes(versionedWatchEvent) if err := AddObjectParams(ws, route, versionedListOptions); err != nil { - return nil, err + return nil, nil, err } addParams(route, action.Params) routes = append(routes, route) @@ -943,7 +971,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag Writes(connectProducedObject) if versionedConnectOptions != nil { if err := AddObjectParams(ws, route, versionedConnectOptions); err != nil { - return nil, err + return nil, nil, err } } addParams(route, action.Params) @@ -957,7 +985,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag } } default: - return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb) + return nil, nil, fmt.Errorf("unrecognized action verb: %s", action.Verb) } for _, route := range routes { route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{ @@ -993,7 +1021,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag // Record the existence of the GVR and the corresponding GVK a.group.EquivalentResourceRegistry.RegisterKindFor(reqScope.Resource, reqScope.Subresource, fqKindToRegister) - return &apiResource, nil + return &apiResource, resourceInfo, nil } // indirectArbitraryPointer returns *ptrToObject for an arbitrary pointer diff --git a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go index 07cef25bbea..d0f42684421 100644 --- a/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go +++ b/staging/src/k8s.io/apiserver/pkg/server/genericapiserver.go @@ -45,6 +45,7 @@ import ( "k8s.io/apiserver/pkg/registry/rest" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/routes" + "k8s.io/apiserver/pkg/storageversion" utilfeature "k8s.io/apiserver/pkg/util/feature" utilopenapi "k8s.io/apiserver/pkg/util/openapi" restclient "k8s.io/client-go/rest" @@ -199,6 +200,9 @@ type GenericAPIServer struct { // APIServerID is the ID of this API server APIServerID string + + // StorageVersionManager holds the storage versions of the API resources installed by this server. + StorageVersionManager storageversion.Manager } // DelegationTarget is an interface which allows for composition of API servers with top level handling that works @@ -409,6 +413,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan // installAPIResources is a private method for installing the REST storage backing each api groupversionresource func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error { + var resourceInfos []*storageversion.ResourceInfo for _, groupVersion := range apiGroupInfo.PrioritizedVersions { if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 { klog.Warningf("Skipping API %v because it has no resources.", groupVersion) @@ -431,9 +436,18 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes - if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil { + r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer) + if err != nil { return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err) } + resourceInfos = append(resourceInfos, r...) + } + + if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) { + // API installation happens before we start listening on the handlers, + // therefore it is safe to register ResourceInfos here. The handler will block + // write requests until the storage versions of the targeting resources are updated. + s.StorageVersionManager.AddResourceInfo(resourceInfos...) } return nil diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go index 0032a8e768f..03e21a4d6d3 100644 --- a/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go @@ -18,6 +18,7 @@ package storageversion import ( "fmt" + "sort" "sync" "sync/atomic" @@ -98,7 +99,10 @@ func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) { func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) { gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") for _, gvr := range gvrs { - s.managedStatus[gvr.GroupResource()] = &updateStatus{} + gr := gvr.GroupResource() + if _, ok := s.managedStatus[gr]; !ok { + s.managedStatus[gr] = &updateStatus{} + } } } @@ -112,22 +116,37 @@ func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.C sc := clientset.InternalV1alpha1().StorageVersions() s.mu.RLock() - resources := []*ResourceInfo{} + resources := []ResourceInfo{} for resource := range s.managedResourceInfos { - resources = append(resources, resource) + resources = append(resources, *resource) } s.mu.RUnlock() hasFailure := false - for _, r := range resources { + // Sorting the list to make sure we have a consistent dedup result, and + // therefore avoid creating unnecessarily duplicated StorageVersion objects. + // For example, extensions.ingresses and networking.k8s.io.ingresses share + // the same underlying storage. Without sorting, in an HA cluster, one + // apiserver may dedup and update StorageVersion for extensions.ingresses, + // while another apiserver may dedup and update StorageVersion for + // networking.k8s.io.ingresses. The storage migrator (which migrates objects + // per GroupResource) will migrate these resources twice, since both + // StorageVersion objects have CommonEncodingVersion (each with one server registered). + sortResourceInfosByGroupResource(resources) + for _, r := range dedupResourceInfos(resources) { dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource) - if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil { + gr := r.GroupResource + // Group must be a valid subdomain in DNS (RFC 1123) + if len(gr.Group) == 0 { + gr.Group = "core" + } + if err := updateStorageVersionFor(sc, serverID, gr, r.EncodingVersion, dv); err != nil { utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err)) - s.recordStatusFailure(r, err) + s.recordStatusFailure(&r, err) hasFailure = true continue } klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource) - s.recordStatusSuccess(r) + s.recordStatusSuccess(&r) } if hasFailure { return @@ -136,6 +155,45 @@ func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.C s.setComplete() } +// dedupResourceInfos dedups ResourceInfos with the same underlying storage. +// ResourceInfos from the same Group with different Versions share the same underlying storage. +// ResourceInfos from different Groups may share the same underlying storage, e.g. +// networking.k8s.io ingresses and extensions ingresses. The StorageVersion manager +// only needs to update one StorageVersion for the equivalent Groups. +func dedupResourceInfos(infos []ResourceInfo) []ResourceInfo { + var ret []ResourceInfo + seen := make(map[schema.GroupResource]struct{}) + for _, info := range infos { + gr := info.GroupResource + if _, ok := seen[gr]; ok { + continue + } + gvrs := info.EquivalentResourceMapper.EquivalentResourcesFor(gr.WithVersion(""), "") + for _, gvr := range gvrs { + seen[gvr.GroupResource()] = struct{}{} + } + ret = append(ret, info) + } + return ret +} + +func sortResourceInfosByGroupResource(infos []ResourceInfo) { + sort.Sort(byGroupResource(infos)) +} + +type byGroupResource []ResourceInfo + +func (s byGroupResource) Len() int { return len(s) } + +func (s byGroupResource) Less(i, j int) bool { + if s[i].GroupResource.Group == s[j].GroupResource.Group { + return s[i].GroupResource.Resource < s[j].GroupResource.Resource + } + return s[i].GroupResource.Group < s[j].GroupResource.Group +} + +func (s byGroupResource) Swap(i, j int) { s[i], s[j] = s[j], s[i] } + // recordStatusSuccess marks updated ResourceInfo as completed. func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) { s.mu.Lock() diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go b/staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go new file mode 100644 index 00000000000..0077cbdf51e --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/manager_test.go @@ -0,0 +1,62 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/apimachinery/pkg/runtime/schema" +) + +func TestSortResourceInfosByGroupResource(t *testing.T) { + tests := []struct { + infos []ResourceInfo + expected []ResourceInfo + }{ + { + infos: nil, + expected: nil, + }, + { + infos: []ResourceInfo{}, + expected: []ResourceInfo{}, + }, + { + infos: []ResourceInfo{ + {GroupResource: schema.GroupResource{Group: "", Resource: "pods"}}, + {GroupResource: schema.GroupResource{Group: "", Resource: "nodes"}}, + {GroupResource: schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}}, + {GroupResource: schema.GroupResource{Group: "extensions", Resource: "ingresses"}}, + }, + expected: []ResourceInfo{ + {GroupResource: schema.GroupResource{Group: "", Resource: "nodes"}}, + {GroupResource: schema.GroupResource{Group: "", Resource: "pods"}}, + {GroupResource: schema.GroupResource{Group: "extensions", Resource: "ingresses"}}, + {GroupResource: schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}}, + }, + }, + } + + for _, tc := range tests { + sortResourceInfosByGroupResource(tc.infos) + if e, a := tc.expected, tc.infos; !reflect.DeepEqual(e, a) { + t.Errorf("unexpected: %v", cmp.Diff(e, a)) + } + } +}