Collect storage versions as ResourceInfo when installing API endpoints.

Co-authored-by: Haowei Cai <haoweic@google.com>
This commit is contained in:
Chao Xu 2020-02-24 15:35:14 -08:00 committed by Haowei Cai
parent 56369375e0
commit 3694756816
6 changed files with 238 additions and 62 deletions

View File

@ -254,7 +254,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = grouplessGroupVersion
group.OptionsExternalVersion = &grouplessGroupVersion
group.Serializer = codecs
if err := (&group).InstallREST(container); err != nil {
if _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -266,7 +266,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = testGroupVersion
group.OptionsExternalVersion = &testGroupVersion
group.Serializer = codecs
if err := (&group).InstallREST(container); err != nil {
if _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -278,7 +278,7 @@ func handleInternal(storage map[string]rest.Storage, admissionControl admission.
group.GroupVersion = newGroupVersion
group.OptionsExternalVersion = &newGroupVersion
group.Serializer = codecs
if err := (&group).InstallREST(container); err != nil {
if _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}
}
@ -3678,7 +3678,7 @@ func TestParentResourceIsRequired(t *testing.T) {
ParameterCodec: parameterCodec,
}
container := restful.NewContainer()
if err := group.InstallREST(container); err == nil {
if _, err := group.InstallREST(container); err == nil {
t.Fatal("expected error")
}
@ -3710,7 +3710,7 @@ func TestParentResourceIsRequired(t *testing.T) {
ParameterCodec: parameterCodec,
}
container = restful.NewContainer()
if err := group.InstallREST(container); err != nil {
if _, err := group.InstallREST(container); err != nil {
t.Fatal(err)
}
@ -4566,7 +4566,7 @@ func TestXGSubresource(t *testing.T) {
Serializer: codecs,
}
if err := (&group).InstallREST(container); err != nil {
if _, err := (&group).InstallREST(container); err != nil {
panic(fmt.Sprintf("unable to install container %s: %v", group.GroupVersion, err))
}

View File

@ -32,6 +32,7 @@ import (
"k8s.io/apiserver/pkg/endpoints/discovery"
"k8s.io/apiserver/pkg/endpoints/handlers/fieldmanager"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storageversion"
openapiproto "k8s.io/kube-openapi/pkg/util/proto"
)
@ -96,7 +97,7 @@ type APIGroupVersion struct {
// InstallREST registers the REST handlers (storage, watch, proxy and redirect) into a restful Container.
// It is expected that the provided path root prefix will serve all operations. Root MUST NOT end
// in a slash.
func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
func (g *APIGroupVersion) InstallREST(container *restful.Container) ([]*storageversion.ResourceInfo, error) {
prefix := path.Join(g.Root, g.GroupVersion.Group, g.GroupVersion.Version)
installer := &APIInstaller{
group: g,
@ -104,11 +105,24 @@ func (g *APIGroupVersion) InstallREST(container *restful.Container) error {
minRequestTimeout: g.MinRequestTimeout,
}
apiResources, ws, registrationErrors := installer.Install()
apiResources, resourceInfos, ws, registrationErrors := installer.Install()
versionDiscoveryHandler := discovery.NewAPIVersionHandler(g.Serializer, g.GroupVersion, staticLister{apiResources})
versionDiscoveryHandler.AddToWebService(ws)
container.Add(ws)
return utilerrors.NewAggregate(registrationErrors)
return removeNonPersistedResources(resourceInfos), utilerrors.NewAggregate(registrationErrors)
}
func removeNonPersistedResources(infos []*storageversion.ResourceInfo) []*storageversion.ResourceInfo {
var filtered []*storageversion.ResourceInfo
for _, info := range infos {
// if EncodingVersion is empty, then the apiserver does not
// need to register this resource via the storage version API,
// thus we can remove it.
if info != nil && len(info.EncodingVersion) > 0 {
filtered = append(filtered, info)
}
}
return filtered
}
// staticLister implements the APIResourceLister interface

View File

@ -43,6 +43,7 @@ import (
utilwarning "k8s.io/apiserver/pkg/endpoints/warning"
"k8s.io/apiserver/pkg/features"
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
versioninfo "k8s.io/component-base/version"
)
@ -94,8 +95,9 @@ var toDiscoveryKubeVerb = map[string]string{
}
// Install handlers for API resources.
func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []error) {
func (a *APIInstaller) Install() ([]metav1.APIResource, []*storageversion.ResourceInfo, *restful.WebService, []error) {
var apiResources []metav1.APIResource
var resourceInfos []*storageversion.ResourceInfo
var errors []error
ws := a.newWebService()
@ -108,15 +110,18 @@ func (a *APIInstaller) Install() ([]metav1.APIResource, *restful.WebService, []e
}
sort.Strings(paths)
for _, path := range paths {
apiResource, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
apiResource, resourceInfo, err := a.registerResourceHandlers(path, a.group.Storage[path], ws)
if err != nil {
errors = append(errors, fmt.Errorf("error in registering resource: %s, %v", path, err))
}
if apiResource != nil {
apiResources = append(apiResources, *apiResource)
}
if resourceInfo != nil {
resourceInfos = append(resourceInfos, resourceInfo)
}
}
return apiResources, ws, errors
return apiResources, resourceInfos, ws, errors
}
// newWebService creates a new restful webservice with the api installer's prefix and version.
@ -182,7 +187,7 @@ func GetResourceKind(groupVersion schema.GroupVersion, storage rest.Storage, typ
return fqKindToRegister, nil
}
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, error) {
func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storage, ws *restful.WebService) (*metav1.APIResource, *storageversion.ResourceInfo, error) {
admit := a.group.Admit
optionsExternalVersion := a.group.GroupVersion
@ -192,19 +197,19 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
resource, subresource, err := splitSubresource(path)
if err != nil {
return nil, err
return nil, nil, err
}
group, version := a.group.GroupVersion.Group, a.group.GroupVersion.Version
fqKindToRegister, err := GetResourceKind(a.group.GroupVersion, storage, a.group.Typer)
if err != nil {
return nil, err
return nil, nil, err
}
versionedPtr, err := a.group.Creater.New(fqKindToRegister)
if err != nil {
return nil, err
return nil, nil, err
}
defaultVersionedObject := indirectArbitraryPointer(versionedPtr)
kind := fqKindToRegister.Kind
@ -215,18 +220,18 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isSubresource {
parentStorage, ok := a.group.Storage[resource]
if !ok {
return nil, fmt.Errorf("missing parent storage: %q", resource)
return nil, nil, fmt.Errorf("missing parent storage: %q", resource)
}
scoper, ok := parentStorage.(rest.Scoper)
if !ok {
return nil, fmt.Errorf("%q must implement scoper", resource)
return nil, nil, fmt.Errorf("%q must implement scoper", resource)
}
namespaceScoped = scoper.NamespaceScoped()
} else {
scoper, ok := storage.(rest.Scoper)
if !ok {
return nil, fmt.Errorf("%q must implement scoper", resource)
return nil, nil, fmt.Errorf("%q must implement scoper", resource)
}
namespaceScoped = scoper.NamespaceScoped()
}
@ -255,7 +260,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
versionedExportOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("ExportOptions"))
if err != nil {
return nil, err
return nil, nil, err
}
if isNamedCreater {
@ -267,30 +272,30 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
list := lister.NewList()
listGVKs, _, err := a.group.Typer.ObjectKinds(list)
if err != nil {
return nil, err
return nil, nil, err
}
versionedListPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind(listGVKs[0].Kind))
if err != nil {
return nil, err
return nil, nil, err
}
versionedList = indirectArbitraryPointer(versionedListPtr)
}
versionedListOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("ListOptions"))
if err != nil {
return nil, err
return nil, nil, err
}
versionedCreateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("CreateOptions"))
if err != nil {
return nil, err
return nil, nil, err
}
versionedPatchOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("PatchOptions"))
if err != nil {
return nil, err
return nil, nil, err
}
versionedUpdateOptions, err := a.group.Creater.New(optionsExternalVersion.WithKind("UpdateOptions"))
if err != nil {
return nil, err
return nil, nil, err
}
var versionedDeleteOptions runtime.Object
@ -299,7 +304,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isGracefulDeleter {
versionedDeleteOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind("DeleteOptions"))
if err != nil {
return nil, err
return nil, nil, err
}
versionedDeleterObject = indirectArbitraryPointer(versionedDeleteOptions)
@ -310,7 +315,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
versionedStatusPtr, err := a.group.Creater.New(optionsExternalVersion.WithKind("Status"))
if err != nil {
return nil, err
return nil, nil, err
}
versionedStatus := indirectArbitraryPointer(versionedStatusPtr)
var (
@ -323,14 +328,14 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
getOptions, getSubpath, _ = getterWithOptions.NewGetOptions()
getOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(getOptions)
if err != nil {
return nil, err
return nil, nil, err
}
getOptionsInternalKind = getOptionsInternalKinds[0]
versionedGetOptions, err = a.group.Creater.New(a.group.GroupVersion.WithKind(getOptionsInternalKind.Kind))
if err != nil {
versionedGetOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(getOptionsInternalKind.Kind))
if err != nil {
return nil, err
return nil, nil, err
}
}
isGetter = true
@ -340,7 +345,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isWatcher {
versionedWatchEventPtr, err := a.group.Creater.New(a.group.GroupVersion.WithKind("WatchEvent"))
if err != nil {
return nil, err
return nil, nil, err
}
versionedWatchEvent = indirectArbitraryPointer(versionedWatchEventPtr)
}
@ -356,7 +361,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if connectOptions != nil {
connectOptionsInternalKinds, _, err := a.group.Typer.ObjectKinds(connectOptions)
if err != nil {
return nil, err
return nil, nil, err
}
connectOptionsInternalKind = connectOptionsInternalKinds[0]
@ -364,7 +369,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if err != nil {
versionedConnectOptions, err = a.group.Creater.New(optionsExternalVersion.WithKind(connectOptionsInternalKind.Kind))
if err != nil {
return nil, err
return nil, nil, err
}
}
}
@ -388,7 +393,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
tableProvider, isTableProvider := storage.(rest.TableConvertor)
if isLister && !isTableProvider {
// All listers must implement TableProvider
return nil, fmt.Errorf("%q must implement TableConvertor", resource)
return nil, nil, fmt.Errorf("%q must implement TableConvertor", resource)
}
var apiResource metav1.APIResource
@ -398,7 +403,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
versioner := storageVersionProvider.StorageVersion()
gvk, err := getStorageVersionKind(versioner, storage, a.group.Typer)
if err != nil {
return nil, err
return nil, nil, err
}
apiResource.StorageVersionHash = discovery.StorageVersionHash(gvk.Group, gvk.Version, gvk.Kind)
}
@ -506,6 +511,29 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
}
var resourceInfo *storageversion.ResourceInfo
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) &&
isStorageVersionProvider &&
storageVersionProvider.StorageVersion() != nil {
versioner := storageVersionProvider.StorageVersion()
encodingGVK, err := getStorageVersionKind(versioner, storage, a.group.Typer)
if err != nil {
return nil, nil, err
}
resourceInfo = &storageversion.ResourceInfo{
GroupResource: schema.GroupResource{
Group: a.group.GroupVersion.Group,
Resource: apiResource.Name,
},
EncodingVersion: encodingGVK.GroupVersion().String(),
// We record EquivalentResourceMapper first instead of calculate
// DecodableVersions immediately because API installation must
// be completed first for us to know equivalent APIs
EquivalentResourceMapper: a.group.EquivalentResourceRegistry,
}
}
// Create Routes for the actions.
// TODO: Add status documentation using Returns()
// Errors (see api/errors/errors.go as well as go-restful router):
@ -525,7 +553,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
for _, s := range a.group.Serializer.SupportedMediaTypes() {
if len(s.MediaTypeSubType) == 0 || len(s.MediaTypeType) == 0 {
return nil, fmt.Errorf("all serializers in the group Serializer must have MediaTypeType and MediaTypeSubType set: %s", s.MediaType)
return nil, nil, fmt.Errorf("all serializers in the group Serializer must have MediaTypeType and MediaTypeSubType set: %s", s.MediaType)
}
}
mediaTypes, streamMediaTypes := negotiation.MediaTypesForSerializer(a.group.Serializer)
@ -573,7 +601,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
isSubresource,
)
if err != nil {
return nil, fmt.Errorf("failed to create field manager: %v", err)
return nil, nil, fmt.Errorf("failed to create field manager: %v", err)
}
}
for _, action := range actions {
@ -608,7 +636,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
kubeVerbs[kubeVerb] = struct{}{}
}
} else {
return nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb)
return nil, nil, fmt.Errorf("unknown action verb for discovery: %s", action.Verb)
}
routes := []*restful.RouteBuilder{}
@ -617,12 +645,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
if isSubresource {
parentStorage, ok := a.group.Storage[resource]
if !ok {
return nil, fmt.Errorf("missing parent storage: %q", resource)
return nil, nil, fmt.Errorf("missing parent storage: %q", resource)
}
fqParentKind, err := GetResourceKind(a.group.GroupVersion, parentStorage, a.group.Typer)
if err != nil {
return nil, err
return nil, nil, err
}
kind = fqParentKind.Kind
}
@ -681,12 +709,12 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Writes(producedObject)
if isGetterWithOptions {
if err := AddObjectParams(ws, route, versionedGetOptions); err != nil {
return nil, err
return nil, nil, err
}
}
if isExporter {
if err := AddObjectParams(ws, route, versionedExportOptions); err != nil {
return nil, err
return nil, nil, err
}
}
addParams(route, action.Params)
@ -708,7 +736,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Returns(http.StatusOK, "OK", versionedList).
Writes(versionedList)
if err := AddObjectParams(ws, route, versionedListOptions); err != nil {
return nil, err
return nil, nil, err
}
switch {
case isLister && isWatcher:
@ -747,7 +775,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedUpdateOptions); err != nil {
return nil, err
return nil, nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
@ -778,7 +806,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Reads(metav1.Patch{}).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedPatchOptions); err != nil {
return nil, err
return nil, nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
@ -811,7 +839,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Reads(defaultVersionedObject).
Writes(producedObject)
if err := AddObjectParams(ws, route, versionedCreateOptions); err != nil {
return nil, err
return nil, nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
@ -841,7 +869,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
route.Reads(versionedDeleterObject)
route.ParameterNamed("body").Required(false)
if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil {
return nil, err
return nil, nil, err
}
}
addParams(route, action.Params)
@ -866,11 +894,11 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
route.Reads(versionedDeleterObject)
route.ParameterNamed("body").Required(false)
if err := AddObjectParams(ws, route, versionedDeleteOptions); err != nil {
return nil, err
return nil, nil, err
}
}
if err := AddObjectParams(ws, route, versionedListOptions, "watch", "allowWatchBookmarks"); err != nil {
return nil, err
return nil, nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
@ -893,7 +921,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Returns(http.StatusOK, "OK", versionedWatchEvent).
Writes(versionedWatchEvent)
if err := AddObjectParams(ws, route, versionedListOptions); err != nil {
return nil, err
return nil, nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
@ -916,7 +944,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Returns(http.StatusOK, "OK", versionedWatchEvent).
Writes(versionedWatchEvent)
if err := AddObjectParams(ws, route, versionedListOptions); err != nil {
return nil, err
return nil, nil, err
}
addParams(route, action.Params)
routes = append(routes, route)
@ -943,7 +971,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
Writes(connectProducedObject)
if versionedConnectOptions != nil {
if err := AddObjectParams(ws, route, versionedConnectOptions); err != nil {
return nil, err
return nil, nil, err
}
}
addParams(route, action.Params)
@ -957,7 +985,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
}
}
default:
return nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
return nil, nil, fmt.Errorf("unrecognized action verb: %s", action.Verb)
}
for _, route := range routes {
route.Metadata(ROUTE_META_GVK, metav1.GroupVersionKind{
@ -993,7 +1021,7 @@ func (a *APIInstaller) registerResourceHandlers(path string, storage rest.Storag
// Record the existence of the GVR and the corresponding GVK
a.group.EquivalentResourceRegistry.RegisterKindFor(reqScope.Resource, reqScope.Subresource, fqKindToRegister)
return &apiResource, nil
return &apiResource, resourceInfo, nil
}
// indirectArbitraryPointer returns *ptrToObject for an arbitrary pointer

View File

@ -45,6 +45,7 @@ import (
"k8s.io/apiserver/pkg/registry/rest"
"k8s.io/apiserver/pkg/server/healthz"
"k8s.io/apiserver/pkg/server/routes"
"k8s.io/apiserver/pkg/storageversion"
utilfeature "k8s.io/apiserver/pkg/util/feature"
utilopenapi "k8s.io/apiserver/pkg/util/openapi"
restclient "k8s.io/client-go/rest"
@ -199,6 +200,9 @@ type GenericAPIServer struct {
// APIServerID is the ID of this API server
APIServerID string
// StorageVersionManager holds the storage versions of the API resources installed by this server.
StorageVersionManager storageversion.Manager
}
// DelegationTarget is an interface which allows for composition of API servers with top level handling that works
@ -409,6 +413,7 @@ func (s preparedGenericAPIServer) NonBlockingRun(stopCh <-chan struct{}) (<-chan
// installAPIResources is a private method for installing the REST storage backing each api groupversionresource
func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *APIGroupInfo, openAPIModels openapiproto.Models) error {
var resourceInfos []*storageversion.ResourceInfo
for _, groupVersion := range apiGroupInfo.PrioritizedVersions {
if len(apiGroupInfo.VersionedResourcesStorageMap[groupVersion.Version]) == 0 {
klog.Warningf("Skipping API %v because it has no resources.", groupVersion)
@ -431,9 +436,18 @@ func (s *GenericAPIServer) installAPIResources(apiPrefix string, apiGroupInfo *A
apiGroupVersion.MaxRequestBodyBytes = s.maxRequestBodyBytes
if err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer); err != nil {
r, err := apiGroupVersion.InstallREST(s.Handler.GoRestfulContainer)
if err != nil {
return fmt.Errorf("unable to setup API %v: %v", apiGroupInfo, err)
}
resourceInfos = append(resourceInfos, r...)
}
if utilfeature.DefaultFeatureGate.Enabled(features.StorageVersionAPI) {
// API installation happens before we start listening on the handlers,
// therefore it is safe to register ResourceInfos here. The handler will block
// write requests until the storage versions of the targeting resources are updated.
s.StorageVersionManager.AddResourceInfo(resourceInfos...)
}
return nil

View File

@ -18,6 +18,7 @@ package storageversion
import (
"fmt"
"sort"
"sync"
"sync/atomic"
@ -98,7 +99,10 @@ func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) {
func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) {
gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "")
for _, gvr := range gvrs {
s.managedStatus[gvr.GroupResource()] = &updateStatus{}
gr := gvr.GroupResource()
if _, ok := s.managedStatus[gr]; !ok {
s.managedStatus[gr] = &updateStatus{}
}
}
}
@ -112,22 +116,37 @@ func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.C
sc := clientset.InternalV1alpha1().StorageVersions()
s.mu.RLock()
resources := []*ResourceInfo{}
resources := []ResourceInfo{}
for resource := range s.managedResourceInfos {
resources = append(resources, resource)
resources = append(resources, *resource)
}
s.mu.RUnlock()
hasFailure := false
for _, r := range resources {
// Sorting the list to make sure we have a consistent dedup result, and
// therefore avoid creating unnecessarily duplicated StorageVersion objects.
// For example, extensions.ingresses and networking.k8s.io.ingresses share
// the same underlying storage. Without sorting, in an HA cluster, one
// apiserver may dedup and update StorageVersion for extensions.ingresses,
// while another apiserver may dedup and update StorageVersion for
// networking.k8s.io.ingresses. The storage migrator (which migrates objects
// per GroupResource) will migrate these resources twice, since both
// StorageVersion objects have CommonEncodingVersion (each with one server registered).
sortResourceInfosByGroupResource(resources)
for _, r := range dedupResourceInfos(resources) {
dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource)
if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil {
gr := r.GroupResource
// Group must be a valid subdomain in DNS (RFC 1123)
if len(gr.Group) == 0 {
gr.Group = "core"
}
if err := updateStorageVersionFor(sc, serverID, gr, r.EncodingVersion, dv); err != nil {
utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err))
s.recordStatusFailure(r, err)
s.recordStatusFailure(&r, err)
hasFailure = true
continue
}
klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource)
s.recordStatusSuccess(r)
s.recordStatusSuccess(&r)
}
if hasFailure {
return
@ -136,6 +155,45 @@ func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.C
s.setComplete()
}
// dedupResourceInfos dedups ResourceInfos with the same underlying storage.
// ResourceInfos from the same Group with different Versions share the same underlying storage.
// ResourceInfos from different Groups may share the same underlying storage, e.g.
// networking.k8s.io ingresses and extensions ingresses. The StorageVersion manager
// only needs to update one StorageVersion for the equivalent Groups.
func dedupResourceInfos(infos []ResourceInfo) []ResourceInfo {
var ret []ResourceInfo
seen := make(map[schema.GroupResource]struct{})
for _, info := range infos {
gr := info.GroupResource
if _, ok := seen[gr]; ok {
continue
}
gvrs := info.EquivalentResourceMapper.EquivalentResourcesFor(gr.WithVersion(""), "")
for _, gvr := range gvrs {
seen[gvr.GroupResource()] = struct{}{}
}
ret = append(ret, info)
}
return ret
}
func sortResourceInfosByGroupResource(infos []ResourceInfo) {
sort.Sort(byGroupResource(infos))
}
type byGroupResource []ResourceInfo
func (s byGroupResource) Len() int { return len(s) }
func (s byGroupResource) Less(i, j int) bool {
if s[i].GroupResource.Group == s[j].GroupResource.Group {
return s[i].GroupResource.Resource < s[j].GroupResource.Resource
}
return s[i].GroupResource.Group < s[j].GroupResource.Group
}
func (s byGroupResource) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// recordStatusSuccess marks updated ResourceInfo as completed.
func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) {
s.mu.Lock()

View File

@ -0,0 +1,62 @@
/*
Copyright 2020 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package storageversion
import (
"reflect"
"testing"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/runtime/schema"
)
func TestSortResourceInfosByGroupResource(t *testing.T) {
tests := []struct {
infos []ResourceInfo
expected []ResourceInfo
}{
{
infos: nil,
expected: nil,
},
{
infos: []ResourceInfo{},
expected: []ResourceInfo{},
},
{
infos: []ResourceInfo{
{GroupResource: schema.GroupResource{Group: "", Resource: "pods"}},
{GroupResource: schema.GroupResource{Group: "", Resource: "nodes"}},
{GroupResource: schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}},
{GroupResource: schema.GroupResource{Group: "extensions", Resource: "ingresses"}},
},
expected: []ResourceInfo{
{GroupResource: schema.GroupResource{Group: "", Resource: "nodes"}},
{GroupResource: schema.GroupResource{Group: "", Resource: "pods"}},
{GroupResource: schema.GroupResource{Group: "extensions", Resource: "ingresses"}},
{GroupResource: schema.GroupResource{Group: "networking.k8s.io", Resource: "ingresses"}},
},
},
}
for _, tc := range tests {
sortResourceInfosByGroupResource(tc.infos)
if e, a := tc.expected, tc.infos; !reflect.DeepEqual(e, a) {
t.Errorf("unexpected: %v", cmp.Diff(e, a))
}
}
}