diff --git a/cmd/kube-controller-manager/app/BUILD b/cmd/kube-controller-manager/app/BUILD index 55bc8e9a31b..bf098a302a9 100644 --- a/cmd/kube-controller-manager/app/BUILD +++ b/cmd/kube-controller-manager/app/BUILD @@ -70,6 +70,7 @@ go_library( "//pkg/controller/resourcequota:go_default_library", "//pkg/controller/serviceaccount:go_default_library", "//pkg/controller/statefulset:go_default_library", + "//pkg/controller/storageversiongc:go_default_library", "//pkg/controller/ttl:go_default_library", "//pkg/controller/ttlafterfinished:go_default_library", "//pkg/controller/volume/attachdetach:go_default_library", @@ -112,6 +113,7 @@ go_library( "//staging/src/k8s.io/apimachinery/pkg/util/sets:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/uuid:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/apiserver/pkg/features:go_default_library", "//staging/src/k8s.io/apiserver/pkg/quota/v1/generic:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server:go_default_library", "//staging/src/k8s.io/apiserver/pkg/server/healthz:go_default_library", diff --git a/cmd/kube-controller-manager/app/controllermanager.go b/cmd/kube-controller-manager/app/controllermanager.go index 0c0b460d932..5bc022dd2f0 100644 --- a/cmd/kube-controller-manager/app/controllermanager.go +++ b/cmd/kube-controller-manager/app/controllermanager.go @@ -36,9 +36,11 @@ import ( "k8s.io/apimachinery/pkg/util/sets" "k8s.io/apimachinery/pkg/util/uuid" "k8s.io/apimachinery/pkg/util/wait" + genericfeatures "k8s.io/apiserver/pkg/features" "k8s.io/apiserver/pkg/server" "k8s.io/apiserver/pkg/server/healthz" "k8s.io/apiserver/pkg/server/mux" + utilfeature "k8s.io/apiserver/pkg/util/feature" cacheddiscovery "k8s.io/client-go/discovery/cached" "k8s.io/client-go/informers" clientset "k8s.io/client-go/kubernetes" @@ -424,6 +426,10 @@ func NewControllerInitializers(loopMode ControllerLoopMode) map[string]InitFunc controllers["ttl-after-finished"] = startTTLAfterFinishedController controllers["root-ca-cert-publisher"] = startRootCACertPublisher controllers["ephemeral-volume"] = startEphemeralVolumeController + if utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) && + utilfeature.DefaultFeatureGate.Enabled(genericfeatures.StorageVersionAPI) { + controllers["storage-version-gc"] = startStorageVersionGCController + } return controllers } diff --git a/cmd/kube-controller-manager/app/core.go b/cmd/kube-controller-manager/app/core.go index 4c9a841f5ae..832fcddac10 100644 --- a/cmd/kube-controller-manager/app/core.go +++ b/cmd/kube-controller-manager/app/core.go @@ -56,6 +56,7 @@ import ( replicationcontroller "k8s.io/kubernetes/pkg/controller/replication" resourcequotacontroller "k8s.io/kubernetes/pkg/controller/resourcequota" serviceaccountcontroller "k8s.io/kubernetes/pkg/controller/serviceaccount" + "k8s.io/kubernetes/pkg/controller/storageversiongc" ttlcontroller "k8s.io/kubernetes/pkg/controller/ttl" "k8s.io/kubernetes/pkg/controller/ttlafterfinished" "k8s.io/kubernetes/pkg/controller/volume/attachdetach" @@ -674,3 +675,12 @@ func getNodeCIDRMaskSizes(clusterCIDRs []*net.IPNet, maskSizeIPv4, maskSizeIPv6 } return nodeMaskCIDRs } + +func startStorageVersionGCController(ctx ControllerContext) (http.Handler, bool, error) { + go storageversiongc.NewStorageVersionGC( + ctx.ClientBuilder.ClientOrDie("storage-version-garbage-collector"), + ctx.InformerFactory.Coordination().V1().Leases(), + ctx.InformerFactory.Internal().V1alpha1().StorageVersions(), + ).Run(ctx.Stop) + return nil, true, nil +} diff --git a/pkg/controller/BUILD b/pkg/controller/BUILD index 38882d00443..595ed59c931 100644 --- a/pkg/controller/BUILD +++ b/pkg/controller/BUILD @@ -119,6 +119,7 @@ filegroup( "//pkg/controller/resourcequota:all-srcs", "//pkg/controller/serviceaccount:all-srcs", "//pkg/controller/statefulset:all-srcs", + "//pkg/controller/storageversiongc:all-srcs", "//pkg/controller/testutil:all-srcs", "//pkg/controller/ttl:all-srcs", "//pkg/controller/ttlafterfinished:all-srcs", diff --git a/pkg/controller/storageversiongc/BUILD b/pkg/controller/storageversiongc/BUILD new file mode 100644 index 00000000000..85d9f5f8332 --- /dev/null +++ b/pkg/controller/storageversiongc/BUILD @@ -0,0 +1,39 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "go_default_library", + srcs = ["gc_controller.go"], + importpath = "k8s.io/kubernetes/pkg/controller/storageversiongc", + visibility = ["//visibility:public"], + deps = [ + "//pkg/controlplane:go_default_library", + "//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library", + "//staging/src/k8s.io/api/coordination/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/apis/meta/v1:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/errors:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/runtime:go_default_library", + "//staging/src/k8s.io/apimachinery/pkg/util/wait:go_default_library", + "//staging/src/k8s.io/client-go/informers/apiserverinternal/v1alpha1:go_default_library", + "//staging/src/k8s.io/client-go/informers/coordination/v1:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", + "//staging/src/k8s.io/client-go/listers/coordination/v1:go_default_library", + "//staging/src/k8s.io/client-go/tools/cache:go_default_library", + "//staging/src/k8s.io/client-go/util/workqueue:go_default_library", + "//vendor/k8s.io/klog/v2:go_default_library", + ], +) + +filegroup( + name = "package-srcs", + srcs = glob(["**"]), + tags = ["automanaged"], + visibility = ["//visibility:private"], +) + +filegroup( + name = "all-srcs", + srcs = [":package-srcs"], + tags = ["automanaged"], + visibility = ["//visibility:public"], +) diff --git a/pkg/controller/storageversiongc/gc_controller.go b/pkg/controller/storageversiongc/gc_controller.go new file mode 100644 index 00000000000..0de4e94f7d2 --- /dev/null +++ b/pkg/controller/storageversiongc/gc_controller.go @@ -0,0 +1,297 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversiongc + +import ( + "context" + "fmt" + "time" + + apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1" + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + utilerrors "k8s.io/apimachinery/pkg/util/errors" + utilruntime "k8s.io/apimachinery/pkg/util/runtime" + "k8s.io/apimachinery/pkg/util/wait" + apiserverinternalinformers "k8s.io/client-go/informers/apiserverinternal/v1alpha1" + coordinformers "k8s.io/client-go/informers/coordination/v1" + "k8s.io/client-go/kubernetes" + coordlisters "k8s.io/client-go/listers/coordination/v1" + "k8s.io/client-go/tools/cache" + "k8s.io/client-go/util/workqueue" + "k8s.io/kubernetes/pkg/controlplane" + + "k8s.io/klog/v2" +) + +// Controller watches kube-apiserver leases and storageversions, and delete stale +// storage version entries and objects. +type Controller struct { + kubeclientset kubernetes.Interface + + leaseLister coordlisters.LeaseLister + leasesSynced cache.InformerSynced + + storageVersionSynced cache.InformerSynced + + leaseQueue workqueue.RateLimitingInterface + storageVersionQueue workqueue.RateLimitingInterface +} + +// NewStorageVersionGC creates a new Controller. +func NewStorageVersionGC(clientset kubernetes.Interface, leaseInformer coordinformers.LeaseInformer, storageVersionInformer apiserverinternalinformers.StorageVersionInformer) *Controller { + c := &Controller{ + kubeclientset: clientset, + leaseLister: leaseInformer.Lister(), + leasesSynced: leaseInformer.Informer().HasSynced, + storageVersionSynced: storageVersionInformer.Informer().HasSynced, + leaseQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_leases"), + storageVersionQueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "storage_version_garbage_collector_storageversions"), + } + + leaseInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + DeleteFunc: c.onDeleteLease, + }) + // use the default resync period from the informer + storageVersionInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ + AddFunc: c.onAddStorageVersion, + UpdateFunc: c.onUpdateStorageVersion, + }) + + return c +} + +// Run starts one worker. +func (c *Controller) Run(stopCh <-chan struct{}) { + defer utilruntime.HandleCrash() + defer c.leaseQueue.ShutDown() + defer c.storageVersionQueue.ShutDown() + defer klog.Infof("Shutting down storage version garbage collector") + + klog.Infof("Starting storage version garbage collector") + + if !cache.WaitForCacheSync(stopCh, c.leasesSynced, c.storageVersionSynced) { + utilruntime.HandleError(fmt.Errorf("timed out waiting for caches to sync")) + return + } + + // Identity lease deletion and storageversion update don't happen too often. Start one + // worker for each of them. + // runLeaseWorker handles legit identity lease deletion, while runStorageVersionWorker + // handles storageversion creation/update with non-existing id. The latter should rarely + // happen. It's okay for the two workers to conflict on update. + go wait.Until(c.runLeaseWorker, time.Second, stopCh) + go wait.Until(c.runStorageVersionWorker, time.Second, stopCh) + + <-stopCh +} + +func (c *Controller) runLeaseWorker() { + for c.processNextLease() { + } +} + +func (c *Controller) processNextLease() bool { + key, quit := c.leaseQueue.Get() + if quit { + return false + } + defer c.leaseQueue.Done(key) + + err := c.processDeletedLease(key.(string)) + if err == nil { + c.leaseQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("lease %v failed with: %v", key, err)) + c.leaseQueue.AddRateLimited(key) + return true +} + +func (c *Controller) runStorageVersionWorker() { + for c.processNextStorageVersion() { + } +} + +func (c *Controller) processNextStorageVersion() bool { + key, quit := c.storageVersionQueue.Get() + if quit { + return false + } + defer c.storageVersionQueue.Done(key) + + err := c.syncStorageVersion(key.(string)) + if err == nil { + c.storageVersionQueue.Forget(key) + return true + } + + utilruntime.HandleError(fmt.Errorf("storage version %v failed with: %v", key, err)) + c.storageVersionQueue.AddRateLimited(key) + return true +} + +func (c *Controller) processDeletedLease(name string) error { + _, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), name, metav1.GetOptions{}) + // the lease isn't deleted, nothing we need to do here + if err == nil { + return nil + } + if !apierrors.IsNotFound(err) { + return err + } + // the frequency of this call won't be too high because we only trigger on identity lease deletions + storageVersionList, err := c.kubeclientset.InternalV1alpha1().StorageVersions().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return err + } + + var errors []error + for _, sv := range storageVersionList.Items { + var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion + hasStaleRecord := false + for _, ssv := range sv.Status.StorageVersions { + if ssv.APIServerID == name { + hasStaleRecord = true + continue + } + serverStorageVersions = append(serverStorageVersions, ssv) + } + if !hasStaleRecord { + continue + } + if err := c.updateOrDeleteStorageVersion(&sv, serverStorageVersions); err != nil { + errors = append(errors, err) + } + } + + return utilerrors.NewAggregate(errors) +} + +func (c *Controller) syncStorageVersion(name string) error { + sv, err := c.kubeclientset.InternalV1alpha1().StorageVersions().Get(context.TODO(), name, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + // The problematic storage version that was added/updated recently is gone. + // Nothing we need to do here. + return nil + } + if err != nil { + return err + } + + hasInvalidID := false + var serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion + for _, v := range sv.Status.StorageVersions { + lease, err := c.kubeclientset.CoordinationV1().Leases(metav1.NamespaceSystem).Get(context.TODO(), v.APIServerID, metav1.GetOptions{}) + if err != nil || lease == nil || lease.Labels == nil || + lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { + // We cannot find a corresponding identity lease from apiserver as well. + // We need to clean up this storage version. + hasInvalidID = true + continue + } + serverStorageVersions = append(serverStorageVersions, v) + } + if !hasInvalidID { + return nil + } + return c.updateOrDeleteStorageVersion(sv, serverStorageVersions) +} + +func (c *Controller) onAddStorageVersion(obj interface{}) { + castObj := obj.(*apiserverinternalv1alpha1.StorageVersion) + c.enqueueStorageVersion(castObj) +} + +func (c *Controller) onUpdateStorageVersion(oldObj, newObj interface{}) { + castNewObj := newObj.(*apiserverinternalv1alpha1.StorageVersion) + c.enqueueStorageVersion(castNewObj) +} + +// enqueueStorageVersion enqueues the storage version if it has entry for invalid apiserver +func (c *Controller) enqueueStorageVersion(obj *apiserverinternalv1alpha1.StorageVersion) { + for _, sv := range obj.Status.StorageVersions { + lease, err := c.leaseLister.Leases(metav1.NamespaceSystem).Get(sv.APIServerID) + if err != nil || lease == nil || lease.Labels == nil || + lease.Labels[controlplane.IdentityLeaseComponentLabelKey] != controlplane.KubeAPIServer { + // we cannot find a corresponding identity lease in cache, enqueue the storageversion + klog.V(4).Infof("Observed storage version %s with invalid apiserver entry", obj.Name) + c.storageVersionQueue.Add(obj.Name) + return + } + } +} + +func (c *Controller) onDeleteLease(obj interface{}) { + castObj, ok := obj.(*coordinationv1.Lease) + if !ok { + tombstone, ok := obj.(cache.DeletedFinalStateUnknown) + if !ok { + utilruntime.HandleError(fmt.Errorf("couldn't get object from tombstone %#v", obj)) + return + } + castObj, ok = tombstone.Obj.(*coordinationv1.Lease) + if !ok { + utilruntime.HandleError(fmt.Errorf("tombstone contained object that is not a Lease %#v", obj)) + return + } + } + + if castObj.Namespace == metav1.NamespaceSystem && + castObj.Labels != nil && + castObj.Labels[controlplane.IdentityLeaseComponentLabelKey] == controlplane.KubeAPIServer { + klog.V(4).Infof("Observed lease %s deleted", castObj.Name) + c.enqueueLease(castObj) + } +} + +func (c *Controller) enqueueLease(obj *coordinationv1.Lease) { + c.leaseQueue.Add(obj.Name) +} + +func setCommonEncodingVersion(sv *apiserverinternalv1alpha1.StorageVersion) { + if len(sv.Status.StorageVersions) == 0 { + return + } + firstVersion := sv.Status.StorageVersions[0].EncodingVersion + agreed := true + for _, ssv := range sv.Status.StorageVersions { + if ssv.EncodingVersion != firstVersion { + agreed = false + break + } + } + if agreed { + sv.Status.CommonEncodingVersion = &firstVersion + } else { + sv.Status.CommonEncodingVersion = nil + } +} + +func (c *Controller) updateOrDeleteStorageVersion(sv *apiserverinternalv1alpha1.StorageVersion, serverStorageVersions []apiserverinternalv1alpha1.ServerStorageVersion) error { + if len(serverStorageVersions) == 0 { + return c.kubeclientset.InternalV1alpha1().StorageVersions().Delete( + context.TODO(), sv.Name, metav1.DeleteOptions{}) + } + sv.Status.StorageVersions = serverStorageVersions + setCommonEncodingVersion(sv) + _, err := c.kubeclientset.InternalV1alpha1().StorageVersions().UpdateStatus( + context.TODO(), sv, metav1.UpdateOptions{}) + return err +} diff --git a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go index 265cded2ec9..414fc194ef4 100644 --- a/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go +++ b/staging/src/k8s.io/apiserver/pkg/endpoints/filters/storageversion.go @@ -22,8 +22,10 @@ import ( "net/http" apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apiserver/pkg/authentication/user" "k8s.io/apiserver/pkg/endpoints/handlers/responsewriters" "k8s.io/apiserver/pkg/endpoints/request" "k8s.io/apiserver/pkg/storageversion" @@ -36,7 +38,9 @@ import ( // 1. non-resource requests, // 2. read requests, // 3. write requests to the storageversion API, -// 4. resources whose StorageVersion is not pending update, including non-persisted resources. +// 4. create requests to the namespace API sent by apiserver itself, +// 5. write requests to the lease API in kube-system namespace, +// 6. resources whose StorageVersion is not pending update, including non-persisted resources. func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Manager, s runtime.NegotiatedSerializer) http.Handler { if svm == nil { // TODO(roycaihw): switch to warning after the feature graduate to beta/GA @@ -69,6 +73,31 @@ func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Man handler.ServeHTTP(w, req) return } + // The system namespace is required for apiserver-identity lease to exist. Allow the apiserver + // itself to create namespaces. + // NOTE: with this exception, if the bootstrap client writes namespaces with a new version, + // and the upgraded apiserver dies before updating the StorageVersion for namespaces, the + // storage migrator won't be able to tell these namespaces are stored in a different version in etcd. + // Because the bootstrap client only creates system namespace and doesn't update them, this can + // only happen if the upgraded apiserver is the first apiserver that kicks off namespace creation, + // or if an upgraded server that joins an existing cluster has new system namespaces (other + // than kube-system, kube-public, kube-node-lease) that need to be created. + u, hasUser := request.UserFrom(ctx) + if requestInfo.APIGroup == "" && requestInfo.Resource == "namespaces" && + requestInfo.Verb == "create" && hasUser && + u.GetName() == user.APIServerUser && contains(u.GetGroups(), user.SystemPrivilegedGroup) { + handler.ServeHTTP(w, req) + return + } + // Allow writes to the lease API in kube-system. The storage version API depends on the + // apiserver-identity leases to operate. Leases in kube-system are either apiserver-identity + // lease (which gets garbage collected when stale) or leader-election leases (which gets + // periodically updated by system components). Both types of leases won't be stale in etcd. + if requestInfo.APIGroup == "coordination.k8s.io" && requestInfo.Resource == "leases" && + requestInfo.Namespace == metav1.NamespaceSystem { + handler.ServeHTTP(w, req) + return + } // If the resource's StorageVersion is not in the to-be-updated list, let it pass. // Non-persisted resources are not in the to-be-updated list, so they will pass. gr := schema.GroupResource{requestInfo.APIGroup, requestInfo.Resource} @@ -81,3 +110,12 @@ func WithStorageVersionPrecondition(handler http.Handler, svm storageversion.Man responsewriters.ErrorNegotiated(apierrors.NewServiceUnavailable(fmt.Sprintf("wait for storage version registration to complete for resource: %v, last seen error: %v", gr, svm.LastUpdateError(gr))), s, gv, w, req) }) } + +func contains(s []string, e string) bool { + for _, a := range s { + if a == e { + return true + } + } + return false +} diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD index aed87fbe3ef..41956d67395 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/BUILD @@ -68,6 +68,7 @@ go_library( "//staging/src/k8s.io/apiserver/pkg/server/storage:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/proxy:go_default_library", + "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/listers/core/v1:go_default_library", "//staging/src/k8s.io/client-go/pkg/version:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", diff --git a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go index 59baa74c37f..d0ab3186a9a 100644 --- a/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go +++ b/staging/src/k8s.io/kube-aggregator/pkg/apiserver/apiserver.go @@ -17,10 +17,12 @@ limitations under the License. package apiserver import ( + "context" "fmt" "net/http" "time" + apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/util/sets" @@ -30,6 +32,7 @@ import ( "k8s.io/apiserver/pkg/server/egressselector" serverstorage "k8s.io/apiserver/pkg/server/storage" utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/version" openapicommon "k8s.io/kube-openapi/pkg/common" @@ -271,7 +274,28 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg utilfeature.DefaultFeatureGate.Enabled(genericfeatures.APIServerIdentity) { // Spawn a goroutine in aggregator apiserver to update storage version for // all built-in resources - s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(context genericapiserver.PostStartHookContext) error { + s.GenericAPIServer.AddPostStartHookOrDie("built-in-resources-storage-version-updater", func(hookContext genericapiserver.PostStartHookContext) error { + // Wait for apiserver-identity to exist first before updating storage + // versions, to avoid storage version GC accidentally garbage-collecting + // storage versions. + kubeClient, err := kubernetes.NewForConfig(hookContext.LoopbackClientConfig) + if err != nil { + return err + } + if err := wait.PollImmediateUntil(100*time.Millisecond, func() (bool, error) { + _, err := kubeClient.CoordinationV1().Leases(metav1.NamespaceSystem).Get( + context.TODO(), s.GenericAPIServer.APIServerID, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return false, nil + } + if err != nil { + return false, err + } + return true, nil + }, hookContext.StopCh); err != nil { + return fmt.Errorf("failed to wait for apiserver-identity lease %s to be created: %v", + s.GenericAPIServer.APIServerID, err) + } // Technically an apiserver only needs to update storage version once during bootstrap. // Reconcile StorageVersion objects every 10 minutes will help in the case that the // StorageVersion objects get accidentally modified/deleted by a different agent. In that @@ -282,16 +306,16 @@ func (c completedConfig) NewWithDelegate(delegationTarget genericapiserver.Deleg // All apiservers (aggregator-apiserver, kube-apiserver, apiextensions-apiserver) // share the same generic apiserver config. The same StorageVersion manager is used // to register all built-in resources when the generic apiservers install APIs. - s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(context.LoopbackClientConfig, s.GenericAPIServer.APIServerID) + s.GenericAPIServer.StorageVersionManager.UpdateStorageVersions(hookContext.LoopbackClientConfig, s.GenericAPIServer.APIServerID) return false, nil - }, context.StopCh) + }, hookContext.StopCh) // Once the storage version updater finishes the first round of update, // the PostStartHook will return to unblock /healthz. The handler chain // won't block write requests anymore. Check every second since it's not // expensive. wait.PollImmediateUntil(1*time.Second, func() (bool, error) { return s.GenericAPIServer.StorageVersionManager.Completed(), nil - }, context.StopCh) + }, hookContext.StopCh) return nil }) } diff --git a/test/integration/storageversion/BUILD b/test/integration/storageversion/BUILD index a9336acb52d..ddde0480621 100644 --- a/test/integration/storageversion/BUILD +++ b/test/integration/storageversion/BUILD @@ -3,14 +3,18 @@ load("@io_bazel_rules_go//go:def.bzl", "go_test") go_test( name = "go_default_test", srcs = [ + "gc_test.go", "storage_version_filter_test.go", "storage_version_main_test.go", ], tags = ["integration"], deps = [ "//cmd/kube-apiserver/app/testing:go_default_library", + "//pkg/controller/storageversiongc:go_default_library", + "//pkg/controlplane:go_default_library", "//staging/src/k8s.io/api/apiserverinternal/v1alpha1:go_default_library", "//staging/src/k8s.io/api/authentication/v1:go_default_library", + "//staging/src/k8s.io/api/coordination/v1:go_default_library", "//staging/src/k8s.io/api/core/v1:go_default_library", "//staging/src/k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset:go_default_library", "//staging/src/k8s.io/apimachinery/pkg/api/errors:go_default_library", @@ -22,6 +26,7 @@ go_test( "//staging/src/k8s.io/apiserver/pkg/storageversion:go_default_library", "//staging/src/k8s.io/apiserver/pkg/util/feature:go_default_library", "//staging/src/k8s.io/client-go/dynamic:go_default_library", + "//staging/src/k8s.io/client-go/informers:go_default_library", "//staging/src/k8s.io/client-go/kubernetes:go_default_library", "//staging/src/k8s.io/client-go/rest:go_default_library", "//staging/src/k8s.io/component-base/featuregate/testing:go_default_library", @@ -29,6 +34,7 @@ go_test( "//staging/src/k8s.io/kube-aggregator/pkg/client/clientset_generated/clientset:go_default_library", "//test/integration/etcd:go_default_library", "//test/integration/framework:go_default_library", + "//vendor/k8s.io/utils/pointer:go_default_library", ], ) diff --git a/test/integration/storageversion/gc_test.go b/test/integration/storageversion/gc_test.go new file mode 100644 index 00000000000..659b218f587 --- /dev/null +++ b/test/integration/storageversion/gc_test.go @@ -0,0 +1,247 @@ +/* +Copyright 2020 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package storageversion + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + apiserverinternalv1alpha1 "k8s.io/api/apiserverinternal/v1alpha1" + coordinationv1 "k8s.io/api/coordination/v1" + apierrors "k8s.io/apimachinery/pkg/api/errors" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/apiserver/pkg/features" + utilfeature "k8s.io/apiserver/pkg/util/feature" + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" + featuregatetesting "k8s.io/component-base/featuregate/testing" + kubeapiservertesting "k8s.io/kubernetes/cmd/kube-apiserver/app/testing" + "k8s.io/kubernetes/pkg/controller/storageversiongc" + "k8s.io/kubernetes/pkg/controlplane" + "k8s.io/kubernetes/test/integration/framework" + "k8s.io/utils/pointer" +) + +const ( + svName = "storageversion.integration.test.foos" + idA = "id-1" + idB = "id-2" + idNonExist = "id-non-exist" +) + +func TestStorageVersionGarbageCollection(t *testing.T) { + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.APIServerIdentity, true)() + defer featuregatetesting.SetFeatureGateDuringTest(t, utilfeature.DefaultFeatureGate, features.StorageVersionAPI, true)() + result := kubeapiservertesting.StartTestServerOrDie(t, nil, nil, framework.SharedEtcd()) + defer result.TearDownFn() + + kubeclient, err := kubernetes.NewForConfig(result.ClientConfig) + if err != nil { + t.Fatalf("Unexpected error: %v", err) + } + + informers := informers.NewSharedInformerFactory(kubeclient, time.Second) + leaseInformer := informers.Coordination().V1().Leases() + storageVersionInformer := informers.Internal().V1alpha1().StorageVersions() + + controller := storageversiongc.NewStorageVersionGC(kubeclient, leaseInformer, storageVersionInformer) + + stopCh := make(chan struct{}) + defer close(stopCh) + go leaseInformer.Informer().Run(stopCh) + go storageVersionInformer.Informer().Run(stopCh) + go controller.Run(stopCh) + + createTestAPIServerIdentityLease(t, kubeclient, idA) + createTestAPIServerIdentityLease(t, kubeclient, idB) + + t.Run("storage version with non-existing id should be GC'ed", func(t *testing.T) { + createTestStorageVersion(t, kubeclient, idNonExist) + assertStorageVersionDeleted(t, kubeclient) + }) + + t.Run("storage version with valid id should not be GC'ed", func(t *testing.T) { + createTestStorageVersion(t, kubeclient, idA) + time.Sleep(10 * time.Second) + sv, err := kubeclient.InternalV1alpha1().StorageVersions().Get( + context.TODO(), svName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to retrieve valid storage version: %v", err) + } + if len(sv.Status.StorageVersions) != 1 { + t.Errorf("unexpected number of storage version entries, expected 1, got: %v", + sv.Status.StorageVersions) + } + expectedID := idA + if sv.Status.StorageVersions[0].APIServerID != expectedID { + t.Errorf("unexpected storage version entry id, expected %v, got: %v", + expectedID, sv.Status.StorageVersions[0].APIServerID) + } + assertCommonEncodingVersion(t, kubeclient, pointer.StringPtr(idToVersion(t, idA))) + if err := kubeclient.InternalV1alpha1().StorageVersions().Delete( + context.TODO(), svName, metav1.DeleteOptions{}); err != nil { + t.Fatalf("failed to cleanup valid storage version: %v", err) + } + }) + + t.Run("deleting an id should delete a storage version entry that it owns", func(t *testing.T) { + createTestStorageVersion(t, kubeclient, idA, idB) + assertStorageVersionEntries(t, kubeclient, 2, idA) + assertCommonEncodingVersion(t, kubeclient, nil) + deleteTestAPIServerIdentityLease(t, kubeclient, idA) + assertStorageVersionEntries(t, kubeclient, 1, idB) + assertCommonEncodingVersion(t, kubeclient, pointer.StringPtr(idToVersion(t, idB))) + }) + + t.Run("deleting an id should delete a storage version object that it owns entirely", func(t *testing.T) { + deleteTestAPIServerIdentityLease(t, kubeclient, idB) + assertStorageVersionDeleted(t, kubeclient) + }) +} + +func createTestStorageVersion(t *testing.T, client kubernetes.Interface, ids ...string) { + sv := &apiserverinternalv1alpha1.StorageVersion{ + ObjectMeta: metav1.ObjectMeta{ + Name: svName, + }, + } + for _, id := range ids { + version := idToVersion(t, id) + v := apiserverinternalv1alpha1.ServerStorageVersion{ + APIServerID: id, + EncodingVersion: version, + DecodableVersions: []string{version}, + } + sv.Status.StorageVersions = append(sv.Status.StorageVersions, v) + } + // every id is unique and creates a different version. We know we have a common encoding + // version when there is only one id. Pick it + if len(ids) == 1 { + sv.Status.CommonEncodingVersion = pointer.StringPtr(sv.Status.StorageVersions[0].EncodingVersion) + } + + createdSV, err := client.InternalV1alpha1().StorageVersions().Create(context.TODO(), sv, metav1.CreateOptions{}) + if err != nil { + t.Fatalf("failed to create storage version %s: %v", svName, err) + } + // update the created sv with intended status + createdSV.Status = sv.Status + if _, err := client.InternalV1alpha1().StorageVersions().UpdateStatus( + context.TODO(), createdSV, metav1.UpdateOptions{}); err != nil { + t.Fatalf("failed to update store version status: %v", err) + } +} + +func assertStorageVersionDeleted(t *testing.T, client kubernetes.Interface) { + if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { + _, err := client.InternalV1alpha1().StorageVersions().Get( + context.TODO(), svName, metav1.GetOptions{}) + if apierrors.IsNotFound(err) { + return true, nil + } + if err != nil { + return false, err + } + return false, nil + }); err != nil { + t.Fatalf("failed to wait for storageversion garbage collection: %v", err) + } +} + +func createTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) { + lease := &coordinationv1.Lease{ + ObjectMeta: metav1.ObjectMeta{ + Name: name, + Namespace: metav1.NamespaceSystem, + Labels: map[string]string{ + controlplane.IdentityLeaseComponentLabelKey: controlplane.KubeAPIServer, + }, + }, + Spec: coordinationv1.LeaseSpec{ + HolderIdentity: pointer.StringPtr(name), + LeaseDurationSeconds: pointer.Int32Ptr(3600), + // create fresh leases + AcquireTime: &metav1.MicroTime{Time: time.Now()}, + RenewTime: &metav1.MicroTime{Time: time.Now()}, + }, + } + if _, err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Create( + context.TODO(), lease, metav1.CreateOptions{}); err != nil { + t.Fatalf("failed to create apiserver identity lease %s: %v", name, err) + } +} + +func deleteTestAPIServerIdentityLease(t *testing.T, client kubernetes.Interface, name string) { + if err := client.CoordinationV1().Leases(metav1.NamespaceSystem).Delete( + context.TODO(), name, metav1.DeleteOptions{}); err != nil { + t.Fatalf("failed to delete apiserver identity lease %s: %v", name, err) + } +} + +func assertStorageVersionEntries(t *testing.T, client kubernetes.Interface, + numEntries int, firstID string) { + var lastErr error + if err := wait.PollImmediate(100*time.Millisecond, 10*time.Second, func() (bool, error) { + sv, err := client.InternalV1alpha1().StorageVersions().Get( + context.TODO(), svName, metav1.GetOptions{}) + if err != nil { + return false, err + } + if len(sv.Status.StorageVersions) != numEntries { + lastErr = fmt.Errorf("unexpected number of storage version entries, expected %v, got: %v", + numEntries, len(sv.Status.StorageVersions)) + return false, nil + } + if sv.Status.StorageVersions[0].APIServerID != firstID { + lastErr = fmt.Errorf("unexpected fisrt storage version entry id, expected %v, got: %v", + firstID, sv.Status.StorageVersions[0].APIServerID) + return false, nil + } + return true, nil + }); err != nil { + t.Fatalf("failed to get expected storage verion entries: %v, last error: %v", err, lastErr) + } +} + +func assertCommonEncodingVersion(t *testing.T, client kubernetes.Interface, e *string) { + sv, err := client.InternalV1alpha1().StorageVersions().Get( + context.TODO(), svName, metav1.GetOptions{}) + if err != nil { + t.Fatalf("failed to retrieve storage version: %v", err) + } + if e == nil { + if sv.Status.CommonEncodingVersion != nil { + t.Errorf("unexpected non-nil common encoding version: %v", sv.Status.CommonEncodingVersion) + } + return + } + if sv.Status.CommonEncodingVersion == nil || *sv.Status.CommonEncodingVersion != *e { + t.Errorf("unexpected common encoding version, expected: %v, got %v", e, sv.Status.CommonEncodingVersion) + } +} + +func idToVersion(t *testing.T, id string) string { + // TODO(roycaihw): rewrite the test, use a id-version table + if !strings.HasPrefix(id, "id-") { + t.Fatalf("should not happen: test using id without id- prefix: %s", id) + } + return fmt.Sprintf("v%s", strings.TrimPrefix(id, "id-")) +} diff --git a/test/integration/storageversion/storage_version_filter_test.go b/test/integration/storageversion/storage_version_filter_test.go index 1bc4c64ee29..df6c2b5b1da 100644 --- a/test/integration/storageversion/storage_version_filter_test.go +++ b/test/integration/storageversion/storage_version_filter_test.go @@ -82,7 +82,7 @@ func assertBlocking(name string, t *testing.T, err error, shouldBlock bool) { func testBuiltinResourceWrite(t *testing.T, cfg *rest.Config, shouldBlock bool) { client := clientset.NewForConfigOrDie(cfg) - _, err := client.CoreV1().Namespaces().Create(context.TODO(), &v1.Namespace{ObjectMeta: metav1.ObjectMeta{Name: "test"}}, metav1.CreateOptions{}) + _, err := client.CoreV1().ConfigMaps("default").Create(context.TODO(), &v1.ConfigMap{ObjectMeta: metav1.ObjectMeta{Name: "test"}}, metav1.CreateOptions{}) assertBlocking("writes to built in resources", t, err, shouldBlock) }