diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD b/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD new file mode 100644 index 00000000000..b398af31bce --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = ["updater.go"], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storageversion", + importpath = "k8s.io/apiserver/pkg/storageversion", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/apis/apiserverinternal/v1alpha1:go_default_library", + "//vendor/k8s.io/klog:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["updater_test.go"], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/apiserver/pkg/apis/apiserverinternal/v1alpha1:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go new file mode 100644 index 00000000000..1d48fc47616 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go @@ -0,0 +1,194 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "sync" + "sync/atomic" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + apiserverclientset "k8s.io/apiserver/pkg/client/clientset_generated/clientset" + "k8s.io/client-go/rest" + _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration + "k8s.io/klog" +) + +// ResourceInfo contains the information to register the resource to the +// storage version API. +type ResourceInfo struct { + Resource metav1.APIResource + // We use a standalone Group instead of reusing the Resource.Group + // because Resource.Group is often omitted, see the comment on + // Resource.Group for why it's omitted. + Group string + EncodingVersion string + DecodableVersions []string + EquivalentResourceMapper runtime.EquivalentResourceRegistry +} + +// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions. +type Manager interface { + // AddResourceInfo adds ResourceInfo to the manager. + AddResourceInfo(resources ...*ResourceInfo) + // RemoveResourceInfo removes ResourceInfo from the manager. + RemoveResourceInfo(r *ResourceInfo) + // UpdatesPending returns if the StorageVersion of a resource is still wait to be updated. + UpdatesPending(group, resource string) bool + + // UpdateStorageVersions updates the StorageVersions. + UpdateStorageVersions(loopbackClientConfig *rest.Config, apiserverID string) + // Completed returns if updating StorageVersions has completed. + Completed() bool +} + +var _ Manager = &DefaultManager{} + +// NewDefaultManager creates a new DefaultManager. +func NewDefaultManager() *DefaultManager { + s := &DefaultManager{} + s.completed.Store(false) + s.groupResources = make(map[string]map[string]struct{}) + s.resources = make(map[*ResourceInfo]struct{}) + return s +} + +// AddResourceInfo adds ResourceInfo to the manager. +// This is not thread-safe. It is expected to be called when the apiserver is installing the endpoints, which is done serially. +func (s *DefaultManager) AddResourceInfo(resources ...*ResourceInfo) { + for _, r := range resources { + s.resources[r] = struct{}{} + s.addGroupResourceFor(r) + } +} + +func (s *DefaultManager) addGroupResourceFor(r *ResourceInfo) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(schema.GroupVersionResource{ + Group: r.Group, + Resource: r.Resource.Name, + }, "") + for _, gvr := range gvrs { + s.addGroupResource(gvr.Group, gvr.Resource) + } +} + +func (s *DefaultManager) addGroupResource(group, resource string) { + if _, ok := s.groupResources[group]; !ok { + s.groupResources[group] = make(map[string]struct{}) + } + s.groupResources[group][resource] = struct{}{} +} + +// RemoveResourceInfo removes ResourceInfo from the manager. +// It is not safe to call this function concurrently with AddResourceInfo. +func (s *DefaultManager) RemoveResourceInfo(r *ResourceInfo) { + s.mu.Lock() + defer s.mu.Unlock() + delete(s.resources, r) + s.removeGroupResourceFor(r) +} + +func (s *DefaultManager) removeGroupResourceFor(r *ResourceInfo) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(schema.GroupVersionResource{ + Group: r.Group, + Resource: r.Resource.Name, + }, "") + for _, gvr := range gvrs { + s.removeGroupResource(gvr.Group, gvr.Version) + } +} + +func (s *DefaultManager) removeGroupResource(group, resource string) { + if _, ok := s.groupResources[group]; !ok { + return + } + delete(s.groupResources[group], resource) + if len(s.groupResources[group]) == 0 { + delete(s.groupResources, group) + } +} + +// UpdatesPending returns if the StorageVersion of a resource is still wait to be updated. +func (s *DefaultManager) UpdatesPending(group, resource string) bool { + s.mu.RLock() + defer s.mu.RUnlock() + if _, ok := s.groupResources[group]; !ok { + return false + } + _, ok := s.groupResources[group][resource] + return ok +} + +// DefaultManager indicates if the aggregator, kube-apiserver, and the +// apiextensions-apiserver have completed reporting their storage versions. +type DefaultManager struct { + completed atomic.Value + + mu sync.RWMutex + resources map[*ResourceInfo]struct{} + groupResources map[string]map[string]struct{} +} + +// setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore. +func (s *DefaultManager) setComplete() { + s.completed.Store(true) +} + +// Completed returns if updating StorageVersions has completed. +func (s *DefaultManager) Completed() bool { + return s.completed.Load().(bool) +} + +func decodableVersions(e runtime.EquivalentResourceRegistry, group string, resource string) []string { + var versions []string + decodingGVRs := e.EquivalentResourcesFor(schema.GroupVersionResource{ + Group: group, + Resource: resource, + }, "") + for _, v := range decodingGVRs { + versions = append(versions, v.GroupVersion().String()) + } + return versions +} + +// UpdateStorageVersions updates the StorageVersions. If the updates are +// successful, following calls to Completed() returns true. +func (s *DefaultManager) UpdateStorageVersions(loopbackClientConfig *rest.Config, serverID string) { + cfg := rest.AddUserAgent(loopbackClientConfig, "system:kube-apiserver") + clientset, err := apiserverclientset.NewForConfig(cfg) + if err != nil { + klog.Fatalf("failed to get clientset: %v", err) + return + } + sc := clientset.InternalV1alpha1().StorageVersions() + + s.mu.RLock() + resources := s.resources + s.mu.RUnlock() + for r := range resources { + r.DecodableVersions = decodableVersions(r.EquivalentResourceMapper, r.Group, r.Resource.Name) + if err := updateStorageVersionFor(sc, serverID, r.Group+"."+r.Resource.Name, r.EncodingVersion, r.DecodableVersions); err != nil { + klog.Fatalf("failed to update storage version for %v", r.Resource.Name) + return + } + klog.V(2).Infof("successfully updated storage version for %v", r.Resource.Name) + s.RemoveResourceInfo(r) + } + klog.V(2).Infof("storage version updates complete") + s.setComplete() +} diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go new file mode 100644 index 00000000000..9e3a80b7de8 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go @@ -0,0 +1,116 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "context" + "time" + + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apiserver/pkg/apis/apiserverinternal/v1alpha1" + "k8s.io/klog" +) + +// Client has the methods required to update the storage version. +type Client interface { + Create(context.Context, *v1alpha1.StorageVersion, metav1.CreateOptions) (*v1alpha1.StorageVersion, error) + Update(context.Context, *v1alpha1.StorageVersion, metav1.UpdateOptions) (*v1alpha1.StorageVersion, error) + Get(context.Context, string, metav1.GetOptions) (*v1alpha1.StorageVersion, error) +} + +func setAgreedEncodingVersion(sv *v1alpha1.StorageVersion) { + if len(sv.Status.ServerStorageVersions) == 0 { + return + } + firstVersion := sv.Status.ServerStorageVersions[0].EncodingVersion + agreed := true + for _, ssv := range sv.Status.ServerStorageVersions { + if ssv.EncodingVersion != firstVersion { + agreed = false + } + } + if agreed { + sv.Status.AgreedEncodingVersion = &firstVersion + } else { + sv.Status.AgreedEncodingVersion = nil + } +} + +// updateStorageVersionFor updates the storage version object for the resource. +// resource is of the format ".". +// TODO: split the resource parameter to two. +func updateStorageVersionFor(c Client, apiserverID string, resource string, encodingVersion string, decodableVersions []string) error { + retries := 3 + var retry int + var err error + for retry < retries { + err = singleUpdate(c, apiserverID, resource, encodingVersion, decodableVersions) + if err == nil { + return nil + } + if apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) { + time.Sleep(1 * time.Second) + continue + } + if err != nil { + klog.Errorf("retry %d, failed to update storage version for %s: %v", retry, resource, err) + retry++ + time.Sleep(1 * time.Second) + } + } + return err +} + +func singleUpdate(c Client, apiserverID, resource, encodingVersion string, decodableVersions []string) error { + shouldCreate := false + sv, err := c.Get(context.TODO(), resource, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if err != nil && apierrors.IsNotFound(err) { + shouldCreate = true + sv = &v1alpha1.StorageVersion{} + sv.ObjectMeta.Name = resource + } + localUpdateStorageVersion(sv, apiserverID, encodingVersion, decodableVersions) + if shouldCreate { + _, err := c.Create(context.TODO(), sv, metav1.CreateOptions{}) + return err + } + _, err = c.Update(context.TODO(), sv, metav1.UpdateOptions{}) + return err +} + +func localUpdateStorageVersion(sv *v1alpha1.StorageVersion, apiserverID, encodingVersion string, decodableVersions []string) { + newSSV := v1alpha1.ServerStorageVersion{ + APIServerID: apiserverID, + EncodingVersion: encodingVersion, + DecodableVersions: decodableVersions, + } + foundSSV := false + for i, ssv := range sv.Status.ServerStorageVersions { + if ssv.APIServerID == apiserverID { + sv.Status.ServerStorageVersions[i] = newSSV + foundSSV = true + } + } + if !foundSSV { + sv.Status.ServerStorageVersions = append(sv.Status.ServerStorageVersions, newSSV) + } + setAgreedEncodingVersion(sv) +} diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/updater_test.go b/staging/src/k8s.io/apiserver/pkg/storageversion/updater_test.go new file mode 100644 index 00000000000..710e21e1c96 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/updater_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/apiserver/pkg/apis/apiserverinternal/v1alpha1" +) + +func TestLocalUpdateStorageVersion(t *testing.T) { + v1 := "v1" + ssv1 := v1alpha1.ServerStorageVersion{ + APIServerID: "1", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2"}, + } + ssv2 := v1alpha1.ServerStorageVersion{ + APIServerID: "2", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2"}, + } + // ssv3 has a different encoding version + ssv3 := v1alpha1.ServerStorageVersion{ + APIServerID: "3", + EncodingVersion: "v2", + DecodableVersions: []string{"v1", "v2"}, + } + ssv4 := v1alpha1.ServerStorageVersion{ + APIServerID: "4", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2", "v4"}, + } + tests := []struct { + old v1alpha1.StorageVersionStatus + newSSV v1alpha1.ServerStorageVersion + expected v1alpha1.StorageVersionStatus + }{ + { + old: v1alpha1.StorageVersionStatus{}, + newSSV: ssv1, + expected: v1alpha1.StorageVersionStatus{ + ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1}, + AgreedEncodingVersion: &v1, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, + AgreedEncodingVersion: &v1, + }, + newSSV: ssv3, + expected: v1alpha1.StorageVersionStatus{ + ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, + AgreedEncodingVersion: &v1, + }, + newSSV: ssv4, + expected: v1alpha1.StorageVersionStatus{ + ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv4}, + AgreedEncodingVersion: &v1, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, + }, + newSSV: ssv4, + expected: v1alpha1.StorageVersionStatus{ + ServerStorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3, ssv4}, + }, + }, + } + + for _, tc := range tests { + sv := &v1alpha1.StorageVersion{Status: tc.old} + localUpdateStorageVersion(sv, tc.newSSV.APIServerID, tc.newSSV.EncodingVersion, tc.newSSV.DecodableVersions) + if e, a := tc.expected, sv.Status; !reflect.DeepEqual(e, a) { + t.Errorf("unexpected: %v", cmp.Diff(e, a)) + } + } +}