diff --git a/staging/src/k8s.io/apiserver/BUILD b/staging/src/k8s.io/apiserver/BUILD index dd2157587e7..1628d93dbb3 100644 --- a/staging/src/k8s.io/apiserver/BUILD +++ b/staging/src/k8s.io/apiserver/BUILD @@ -41,6 +41,7 @@ filegroup( "//staging/src/k8s.io/apiserver/pkg/registry:all-srcs", "//staging/src/k8s.io/apiserver/pkg/server:all-srcs", "//staging/src/k8s.io/apiserver/pkg/storage:all-srcs", + "//staging/src/k8s.io/apiserver/pkg/storageversion:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/apihelpers:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/cache:all-srcs", "//staging/src/k8s.io/apiserver/pkg/util/dryrun:all-srcs", diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD b/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD new file mode 100644 index 00000000000..98cc5837204 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/BUILD @@ -0,0 +1,48 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "go_default_library", + srcs = [ + "manager.go", + "updater.go", + ], + importmap = "k8s.io/kubernetes/vendor/k8s.io/apiserver/pkg/storageversion", + importpath = "k8s.io/apiserver/pkg/storageversion", + visibility = ["//visibility:public"], + deps = [ + "//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/runtime/schema:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/rest:go_default_library", + "//staging/src/k8s.io/component-base/metrics/prometheus/workqueue:go_default_library", + "//vendor/k8s.io/klog/v2:go_default_library", + ], +) + +go_test( + name = "go_default_test", + srcs = ["updater_test.go"], + embed = [":go_default_library"], + deps = [ + "//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library", + "//vendor/github.com/google/go-cmp/cmp:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/OWNERS b/staging/src/k8s.io/apiserver/pkg/storageversion/OWNERS new file mode 100644 index 00000000000..ca9aa135841 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/OWNERS @@ -0,0 +1,5 @@ +# See the OWNERS docs at https://go.k8s.io/owners + +approvers: +- caesarxuchao +- roycaihw diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go new file mode 100644 index 00000000000..5903eac9379 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/manager.go @@ -0,0 +1,219 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "fmt" + "sync" + "sync/atomic" + + "k8s.io/apimachinery/pkg/runtime" + "k8s.io/apimachinery/pkg/runtime/schema" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + _ "k8s.io/component-base/metrics/prometheus/workqueue" // for workqueue metric registration + "k8s.io/klog/v2" +) + +// ResourceInfo contains the information to register the resource to the +// storage version API. +type ResourceInfo struct { + GroupResource schema.GroupResource + + EncodingVersion string + // Used to calculate decodable versions. Can only be used after all + // equivalent versions are registered by InstallREST. + EquivalentResourceMapper runtime.EquivalentResourceRegistry +} + +// Manager records the resources whose StorageVersions need updates, and provides a method to update those StorageVersions. +type Manager interface { + // AddResourceInfo records resources whose StorageVersions need updates + AddResourceInfo(resources ...*ResourceInfo) + // UpdateStorageVersions tries to update the StorageVersions of the recorded resources + UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, apiserverID string) + // PendingUpdate returns true if the StorageVersion of the given resource is still pending update. + PendingUpdate(gr schema.GroupResource) bool + // LastUpdateError returns the last error hit when updating the storage version of the given resource. + LastUpdateError(gr schema.GroupResource) error + // Completed returns true if updating StorageVersions of all recorded resources has completed. + Completed() bool +} + +var _ Manager = &defaultManager{} + +// defaultManager indicates if an apiserver has completed reporting its storage versions. +type defaultManager struct { + completed atomic.Value + + mu sync.RWMutex + // managedResourceInfos records the ResourceInfos whose StorageVersions will get updated in the next + // UpdateStorageVersions call + managedResourceInfos map[*ResourceInfo]struct{} + // managedStatus records the update status of StorageVersion for each GroupResource. Since one + // ResourceInfo may expand into multiple GroupResource (e.g. ingresses.networking.k8s.io and ingresses.extensions), + // this map allows quick status lookup for a GroupResource, during API request handling. + managedStatus map[schema.GroupResource]*updateStatus +} + +type updateStatus struct { + done bool + lastErr error +} + +// NewDefaultManager creates a new defaultManager. +func NewDefaultManager() Manager { + s := &defaultManager{} + s.completed.Store(false) + s.managedResourceInfos = make(map[*ResourceInfo]struct{}) + s.managedStatus = make(map[schema.GroupResource]*updateStatus) + return s +} + +// AddResourceInfo adds ResourceInfo to the manager. +func (s *defaultManager) AddResourceInfo(resources ...*ResourceInfo) { + s.mu.Lock() + defer s.mu.Unlock() + for _, r := range resources { + s.managedResourceInfos[r] = struct{}{} + s.addPendingManagedStatusLocked(r) + } +} + +func (s *defaultManager) addPendingManagedStatusLocked(r *ResourceInfo) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") + for _, gvr := range gvrs { + s.managedStatus[gvr.GroupResource()] = &updateStatus{} + } +} + +// UpdateStorageVersions tries to update the StorageVersions of the recorded resources +func (s *defaultManager) UpdateStorageVersions(kubeAPIServerClientConfig *rest.Config, serverID string) { + clientset, err := kubernetes.NewForConfig(kubeAPIServerClientConfig) + if err != nil { + utilruntime.HandleError(fmt.Errorf("failed to get clientset: %v", err)) + return + } + sc := clientset.InternalV1alpha1().StorageVersions() + + s.mu.RLock() + resources := make([]*ResourceInfo, len(s.managedResourceInfos)) + for resource := range s.managedResourceInfos { + resources = append(resources, resource) + } + s.mu.RUnlock() + hasFailure := false + for _, r := range resources { + dv := decodableVersions(r.EquivalentResourceMapper, r.GroupResource) + if err := updateStorageVersionFor(sc, serverID, r.GroupResource, r.EncodingVersion, dv); err != nil { + utilruntime.HandleError(fmt.Errorf("failed to update storage version for %v: %v", r.GroupResource, err)) + s.recordStatusFailure(r, err) + hasFailure = true + continue + } + klog.V(2).Infof("successfully updated storage version for %v", r.GroupResource) + s.recordStatusSuccess(r) + } + if hasFailure { + return + } + klog.V(2).Infof("storage version updates complete") + s.setComplete() +} + +// recordStatusSuccess marks updated ResourceInfo as completed. +func (s *defaultManager) recordStatusSuccess(r *ResourceInfo) { + s.mu.Lock() + defer s.mu.Unlock() + s.recordStatusSuccessLocked(r) +} + +func (s *defaultManager) recordStatusSuccessLocked(r *ResourceInfo) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") + for _, gvr := range gvrs { + s.recordSuccessGroupResourceLocked(gvr.GroupResource()) + } +} + +func (s *defaultManager) recordSuccessGroupResourceLocked(gr schema.GroupResource) { + if _, ok := s.managedStatus[gr]; !ok { + return + } + s.managedStatus[gr].done = true + s.managedStatus[gr].lastErr = nil +} + +// recordStatusFailure records latest error updating ResourceInfo. +func (s *defaultManager) recordStatusFailure(r *ResourceInfo, err error) { + s.mu.Lock() + defer s.mu.Unlock() + s.recordStatusFailureLocked(r, err) +} + +func (s *defaultManager) recordStatusFailureLocked(r *ResourceInfo, err error) { + gvrs := r.EquivalentResourceMapper.EquivalentResourcesFor(r.GroupResource.WithVersion(""), "") + for _, gvr := range gvrs { + s.recordErrorGroupResourceLocked(gvr.GroupResource(), err) + } +} + +func (s *defaultManager) recordErrorGroupResourceLocked(gr schema.GroupResource, err error) { + if _, ok := s.managedStatus[gr]; !ok { + return + } + s.managedStatus[gr].lastErr = err +} + +// PendingUpdate returns if the StorageVersion of a resource is still wait to be updated. +func (s *defaultManager) PendingUpdate(gr schema.GroupResource) bool { + s.mu.RLock() + defer s.mu.RUnlock() + if _, ok := s.managedStatus[gr]; !ok { + return false + } + return !s.managedStatus[gr].done +} + +// LastUpdateError returns the last error hit when updating the storage version of the given resource. +func (s *defaultManager) LastUpdateError(gr schema.GroupResource) error { + s.mu.RLock() + defer s.mu.RUnlock() + if _, ok := s.managedStatus[gr]; !ok { + return fmt.Errorf("couldn't find managed status for %v", gr) + } + return s.managedStatus[gr].lastErr +} + +// setComplete marks the completion of updating StorageVersions. No write requests need to be blocked anymore. +func (s *defaultManager) setComplete() { + s.completed.Store(true) +} + +// Completed returns if updating StorageVersions has completed. +func (s *defaultManager) Completed() bool { + return s.completed.Load().(bool) +} + +func decodableVersions(e runtime.EquivalentResourceRegistry, gr schema.GroupResource) []string { + var versions []string + decodingGVRs := e.EquivalentResourcesFor(gr.WithVersion(""), "") + for _, v := range decodingGVRs { + versions = append(versions, v.GroupVersion().String()) + } + return versions +} diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go new file mode 100644 index 00000000000..110accf22da --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/updater.go @@ -0,0 +1,122 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "context" + "fmt" + "time" + + "k8s.io/api/apiserverinternal/v1alpha1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/klog/v2" +) + +// Client has the methods required to update the storage version. +type Client interface { + Create(context.Context, *v1alpha1.StorageVersion, metav1.CreateOptions) (*v1alpha1.StorageVersion, error) + Update(context.Context, *v1alpha1.StorageVersion, metav1.UpdateOptions) (*v1alpha1.StorageVersion, error) + Get(context.Context, string, metav1.GetOptions) (*v1alpha1.StorageVersion, error) +} + +func setCommonEncodingVersion(sv *v1alpha1.StorageVersion) { + if len(sv.Status.StorageVersions) == 0 { + return + } + firstVersion := sv.Status.StorageVersions[0].EncodingVersion + agreed := true + for _, ssv := range sv.Status.StorageVersions { + if ssv.EncodingVersion != firstVersion { + agreed = false + break + } + } + if agreed { + sv.Status.CommonEncodingVersion = &firstVersion + } else { + sv.Status.CommonEncodingVersion = nil + } +} + +// updateStorageVersionFor updates the storage version object for the resource. +func updateStorageVersionFor(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error { + retries := 3 + var retry int + var err error + for retry < retries { + err = singleUpdate(c, apiserverID, gr, encodingVersion, decodableVersions) + if err == nil { + return nil + } + if apierrors.IsAlreadyExists(err) || apierrors.IsConflict(err) { + time.Sleep(1 * time.Second) + continue + } + if err != nil { + klog.Errorf("retry %d, failed to update storage version for %v: %v", retry, gr, err) + retry++ + time.Sleep(1 * time.Second) + } + } + return err +} + +func singleUpdate(c Client, apiserverID string, gr schema.GroupResource, encodingVersion string, decodableVersions []string) error { + shouldCreate := false + name := fmt.Sprintf("%s.%s", gr.Group, gr.Resource) + sv, err := c.Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil && !apierrors.IsNotFound(err) { + return err + } + if apierrors.IsNotFound(err) { + shouldCreate = true + sv = &v1alpha1.StorageVersion{} + sv.ObjectMeta.Name = name + } + updatedSV := localUpdateStorageVersion(sv, apiserverID, encodingVersion, decodableVersions) + if shouldCreate { + _, err := c.Create(context.TODO(), updatedSV, metav1.CreateOptions{}) + return err + } + _, err = c.Update(context.TODO(), updatedSV, metav1.UpdateOptions{}) + return err +} + +// localUpdateStorageVersion updates the input storageversion with given server storageversion info. +// The function updates the input storageversion in place. +func localUpdateStorageVersion(sv *v1alpha1.StorageVersion, apiserverID, encodingVersion string, decodableVersions []string) *v1alpha1.StorageVersion { + newSSV := v1alpha1.ServerStorageVersion{ + APIServerID: apiserverID, + EncodingVersion: encodingVersion, + DecodableVersions: decodableVersions, + } + foundSSV := false + for i, ssv := range sv.Status.StorageVersions { + if ssv.APIServerID == apiserverID { + sv.Status.StorageVersions[i] = newSSV + foundSSV = true + break + } + } + if !foundSSV { + sv.Status.StorageVersions = append(sv.Status.StorageVersions, newSSV) + } + setCommonEncodingVersion(sv) + return sv +} diff --git a/staging/src/k8s.io/apiserver/pkg/storageversion/updater_test.go b/staging/src/k8s.io/apiserver/pkg/storageversion/updater_test.go new file mode 100644 index 00000000000..be899a280f1 --- /dev/null +++ b/staging/src/k8s.io/apiserver/pkg/storageversion/updater_test.go @@ -0,0 +1,102 @@ +/* +Copyright 2019 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "reflect" + "testing" + + "github.com/google/go-cmp/cmp" + "k8s.io/api/apiserverinternal/v1alpha1" +) + +func TestLocalUpdateStorageVersion(t *testing.T) { + v1 := "v1" + ssv1 := v1alpha1.ServerStorageVersion{ + APIServerID: "1", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2"}, + } + ssv2 := v1alpha1.ServerStorageVersion{ + APIServerID: "2", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2"}, + } + // ssv3 has a different encoding version + ssv3 := v1alpha1.ServerStorageVersion{ + APIServerID: "3", + EncodingVersion: "v2", + DecodableVersions: []string{"v1", "v2"}, + } + ssv4 := v1alpha1.ServerStorageVersion{ + APIServerID: "4", + EncodingVersion: "v1", + DecodableVersions: []string{"v1", "v2", "v4"}, + } + tests := []struct { + old v1alpha1.StorageVersionStatus + newSSV v1alpha1.ServerStorageVersion + expected v1alpha1.StorageVersionStatus + }{ + { + old: v1alpha1.StorageVersionStatus{}, + newSSV: ssv1, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1}, + CommonEncodingVersion: &v1, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, + CommonEncodingVersion: &v1, + }, + newSSV: ssv3, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2}, + CommonEncodingVersion: &v1, + }, + newSSV: ssv4, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv4}, + CommonEncodingVersion: &v1, + }, + }, + { + old: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3}, + }, + newSSV: ssv4, + expected: v1alpha1.StorageVersionStatus{ + StorageVersions: []v1alpha1.ServerStorageVersion{ssv1, ssv2, ssv3, ssv4}, + }, + }, + } + + for _, tc := range tests { + sv := &v1alpha1.StorageVersion{Status: tc.old} + updated := localUpdateStorageVersion(sv, tc.newSSV.APIServerID, tc.newSSV.EncodingVersion, tc.newSSV.DecodableVersions) + if e, a := tc.expected, updated.Status; !reflect.DeepEqual(e, a) { + t.Errorf("unexpected: %v", cmp.Diff(e, a)) + } + } +}